Skip to content

Commit

Permalink
wip for partial cancel
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jan 31, 2025
1 parent 43b93fa commit 61cfda9
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 52 deletions.
147 changes: 127 additions & 20 deletions pkg/fluxqueue/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workers

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
Expand All @@ -16,7 +15,7 @@ import (
api "github.com/converged-computing/fluxqueue/api/v1alpha1"
"github.com/converged-computing/fluxqueue/pkg/defaults"
"github.com/converged-computing/fluxqueue/pkg/fluxqueue/queries"
"github.com/converged-computing/fluxqueue/pkg/types"
jgf "github.com/converged-computing/fluxqueue/pkg/jgf"
"github.com/riverqueue/river"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchTypes "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -126,15 +125,15 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}

// Now get the nodes. These are actually cores assigned to nodes, so we need to keep count
nodes, err := parseNodes(response.Allocation, job.Args.Cores)
nodes, cancelResponses, err := parseNodes(response.Allocation, job.Args.Cores)
if err != nil {
wlog.Info("Error parsing nodes from fluxion response", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err)
return err
}
wlog.Info("Fluxion allocation response", "Nodes", nodes)

// Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler
err = w.releaseJob(ctx, job.Args, fluxID, nodes)
err = w.releaseJob(ctx, job.Args, fluxID, nodes, cancelResponses)
if err != nil {
return err
}
Expand All @@ -143,7 +142,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}

// Release job will unsuspend a job or ungate pods to allow for scheduling
func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string) error {
func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string, cancelResponses []string) error {
var err error

if args.Type == api.JobWrappedJob.String() {
Expand Down Expand Up @@ -211,42 +210,85 @@ func (w JobWorker) reserveJob(ctx context.Context, args JobArgs, fluxID int64) e
// so the group size will likely not coincide with the number of nodes. For
// this reason, we have to divide to place them. The final number should
// match the group size.
func parseNodes(allocation string, cores int32) ([]string, error) {
func parseNodes(allocation string, cores int32) ([]string, []string, error) {

// We can eventually send over more metadata, for now just a list of nodes
nodes := []string{}

// The response is the graph with assignments. Here we parse the graph into a struct to get nodes.
var graph types.AllocationResponse
err := json.Unmarshal([]byte(allocation), &graph)
// We also need to save a corresponding cancel request
cancelRequests := []string{}

// Also try serailizing back into graph
g, err := jgf.LoadFluxJGF(allocation)
if err != nil {
return nodes, err
return nodes, cancelRequests, err
}
fmt.Println(g)

// For each pod, we will need to be able to do partial cancel.
// We can do this by saving the initial graph (without cores)
// and adding them on to the cancel request. We first need a lookup
// for the path between cluster->subnet->nodes->cores.
// This logic will need to be updated if we change the graph.
nodeLookup := map[string]jgf.Node{}

// Store nodes based on paths
nodePaths := map[string]jgf.Node{}
edgeLookup := map[string][]jgf.Edge{}

// Parse nodes first so we can match the containment path to the host
for _, node := range g.Graph.Nodes {
nodeLookup[node.Id] = node
nodePaths[node.Metadata.Paths["containment"]] = node
}

// Parse nodes first and get containment and name lookup
// The edge lookup will allow us to add connected nodes
// We need to be able to map a node path to a list of edges
// The node path gets us the node id (source)
var addEdge = func(node *jgf.Node, edge *jgf.Edge) {
path := node.Metadata.Paths["containment"]
_, ok := edgeLookup[path]
if !ok {
edgeLookup[path] = []jgf.Edge{}
}
edgeLookup[path] = append(edgeLookup[path], *edge)
}
for _, edge := range g.Graph.Edges {
targetNode := nodeLookup[edge.Target]
sourceNode := nodeLookup[edge.Source]
addEdge(&targetNode, &edge)
addEdge(&sourceNode, &edge)
}

// Parse nodes first so we can match the containment path to the host
lookup := map[string]string{}
for _, node := range graph.Graph.Nodes {
for _, node := range g.Graph.Nodes {
nodePath := node.Metadata.Paths["containment"]
nodeLookup[fmt.Sprintf("%d", node.Metadata.Id)] = node
if node.Metadata.Type == "node" {
nodePath := node.Metadata.Paths["containment"]
nodeId := node.Metadata.Basename
lookup[nodePath] = nodeId
}
}

// We also need to know the exact cores that are assigned to each node
coresByNode := map[string][]jgf.Node{}

// We are going to first make a count of cores per node. We do this
// by parsing the containment path. It should always look like:
// "/cluster0/0/kind-worker1/core0 for a core
coreCounts := map[string]int32{}
for _, node := range graph.Graph.Nodes {
for _, node := range g.Graph.Nodes {
path := node.Metadata.Paths["containment"]

if node.Metadata.Type == "core" {
corePath := node.Metadata.Paths["containment"]
coreName := fmt.Sprintf("core%d", node.Metadata.Id)
nodePath := strings.TrimRight(corePath, "/"+coreName)
nodePath := strings.TrimRight(path, "/"+coreName)
nodeId, ok := lookup[nodePath]

// This shouldn't happen, but if it does, we should catch it
if !ok {
return nodes, fmt.Errorf("unknown node path %s", nodePath)
return nodes, cancelRequests, fmt.Errorf("unknown node path %s", nodePath)
}

// Update core counts for the node
Expand All @@ -257,21 +299,86 @@ func parseNodes(allocation string, cores int32) ([]string, error) {

// Each core is one
coreCounts[nodeId] += 1

// This is a list of cores (node) assigned to the physical node
// We do this based on ids so we can use the edge lookup
assignedCores, ok := coresByNode[nodePath]
if !ok {
assignedCores = []jgf.Node{}
}
assignedCores = append(assignedCores, node)
coresByNode[nodeId] = assignedCores
}
}
fmt.Printf("Distributing %d cores per pod into core counts ", cores)
fmt.Println(coreCounts)

// Now we need to divide by the slot size (number of cores per pod)
// and add those nodes to a list (there will be repeats)
// and add those nodes to a list (there will be repeats). For each slot
// (pod) we need to generate a JGF that includes resources for cancel.
for nodeId, totalCores := range coreCounts {
fmt.Printf("Node %s has %d cores to fit %d core(s)\n", nodeId, totalCores, cores)
fmt.Printf("Node %s has %d cores across slots to fit %d core(s) per slot\n", nodeId, totalCores, cores)
numberSlots := totalCores / cores
for _ = range int32(numberSlots) {

// Prepare a graph for a cancel response
graph := jgf.NewFluxJGF()
seenEdges := map[string]bool{}
coreNodes := coresByNode[nodeId]

// addNewEdges to the graph Edges if we haven't yet
var addNewEdges = func(path string) {
addEdges, ok := edgeLookup[path]
if ok {
for _, addEdge := range addEdges {
edgeId := fmt.Sprintf("%s-%s", addEdge.Source, addEdge.Target)
_, alreadyAdded := seenEdges[edgeId]
if !alreadyAdded {
graph.Graph.Edges = append(graph.Graph.Edges, addEdge)
seenEdges[edgeId] = true
}
}
}
}

// The cancel response needs only units from the graph associated
// with the specific cores assigned.
for _, coreNode := range coreNodes {
path := coreNode.Metadata.Paths["containment"]
_, ok := graph.NodeMap[path]
if !ok {
graph.NodeMap[path] = coreNode
graph.Graph.Nodes = append(graph.Graph.Nodes, coreNode)
addNewEdges(path)
}
// Parse the entire path and add nodes up root
parts := strings.Split(path, "/")
for idx := range len(parts) {
if idx == 0 {
continue
}
path := strings.Join(parts[0:idx], "/")
fmt.Println(path)
_, ok := graph.NodeMap[path]
if !ok {
graph.NodeMap[path] = nodePaths[path]
graph.Graph.Nodes = append(graph.Graph.Nodes, nodePaths[path])
addNewEdges(path)
}
}
}
nodes = append(nodes, nodeId)

// Serialize the cancel request to string
graphStr, err := graph.ToJson()
if err != nil {
return nodes, cancelRequests, err
}
cancelRequests = append(cancelRequests, graphStr)
fmt.Println(graphStr)
}
}
return nodes, nil
return nodes, cancelRequests, nil
}

// Unsuspend the job, adding an annotation for nodes along with the fluxion scheduler
Expand Down
9 changes: 8 additions & 1 deletion pkg/jgf/jgf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func NewFluxJGF() FluxJGF {
}
}

// Load a graph payload into a JGF structure
func LoadFluxJGF(payload string) (FluxJGF, error) {
var graph FluxJGF
err := json.Unmarshal([]byte(payload), &graph)
return graph, err
}

// ToJson returns a Json string of the graph
func (g *FluxJGF) ToJson() (string, error) {
toprint, err := json.MarshalIndent(g.Graph, "", "\t")
Expand Down Expand Up @@ -81,7 +88,7 @@ func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) {

// MakeEdge creates an edge for the JGF
func (g *FluxJGF) MakeEdge(source string, target string, contains string) {
newedge := edge{
newedge := Edge{
Source: source,
Target: target,
Metadata: edgeMetadata{Subsystem: containmentKey},
Expand Down
4 changes: 2 additions & 2 deletions pkg/jgf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Node struct {
Metadata nodeMetadata `json:"metadata,omitempty"`
}

type edge struct {
type Edge struct {
Source string `json:"source"`
Relation string `json:"relation,omitempty"`
Target string `json:"target"`
Expand Down Expand Up @@ -36,7 +36,7 @@ type nodeMetadata struct {

type graph struct {
Nodes []Node `json:"nodes"`
Edges []edge `json:"edges"`
Edges []Edge `json:"edges"`
// Metadata metadata `json:"metadata,omitempty"`
Directed bool `json:"directed,omitempty"`
}
Expand Down
29 changes: 0 additions & 29 deletions pkg/types/types.go

This file was deleted.

0 comments on commit 61cfda9

Please sign in to comment.