Skip to content

Commit

Permalink
[kubectl-plugin] Add support for retrieving logs for different ray re…
Browse files Browse the repository at this point in the history
…source types (#2677)
  • Loading branch information
chiayi authored Jan 9, 2025
1 parent 7a768f9 commit e959950
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 23 deletions.
86 changes: 71 additions & 15 deletions kubectl-plugin/pkg/cmd/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path"
"path/filepath"
"strings"

"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
Expand All @@ -24,6 +25,8 @@ import (
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/templates"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/completion"
)

Expand All @@ -36,6 +39,7 @@ type ClusterLogOptions struct {
outputDir string
nodeType string
ResourceName string
ResourceType util.ResourceType
}

var (
Expand Down Expand Up @@ -78,13 +82,13 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "log (RAYCLUSTER) [--out-dir DIR_PATH] [--node-type all|head|worker]",
Use: "log (RAYCLUSTER | TYPE/NAME) [--out-dir DIR_PATH] [--node-type all|head|worker]",
Short: "Get ray cluster log",
Long: logLong,
Example: logExample,
Aliases: []string{"logs"},
SilenceUsage: true,
ValidArgsFunction: completion.RayClusterCompletionFunc(cmdFactory),
ValidArgsFunction: completion.RayClusterResourceNameCompletionFunc(cmdFactory),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(cmd, args); err != nil {
return err
Expand All @@ -106,10 +110,37 @@ func (options *ClusterLogOptions) Complete(cmd *cobra.Command, args []string) er
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}

options.ResourceName = args[0]
if *options.configFlags.Namespace == "" {
*options.configFlags.Namespace = "default"
}

typeAndName := strings.Split(args[0], "/")
if len(typeAndName) == 1 {
options.ResourceType = util.RayCluster
options.ResourceName = typeAndName[0]
} else {
if len(typeAndName) != 2 || typeAndName[1] == "" {
return cmdutil.UsageErrorf(cmd, "invalid resource type/name: %s", args[0])
}

switch strings.ToLower(typeAndName[0]) {
case string(util.RayCluster):
options.ResourceType = util.RayCluster
case string(util.RayJob):
options.ResourceType = util.RayJob
case string(util.RayService):
options.ResourceType = util.RayService
default:
return cmdutil.UsageErrorf(cmd, "unsupported resource type: %s", typeAndName[0])
}

options.ResourceName = typeAndName[1]
}

if options.nodeType == "" {
options.nodeType = "all"
} else {
options.nodeType = strings.ToLower(options.nodeType)
}

return nil
Expand Down Expand Up @@ -159,45 +190,70 @@ func (options *ClusterLogOptions) Validate() error {
}

func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
kubeClientSet, err := factory.KubernetesClientSet()
clientSet, err := client.NewClient(factory)
if err != nil {
return fmt.Errorf("failed to retrieve kubernetes client set: %w", err)
}

// Retrieve raycluster name for the non raycluster type node
var clusterName string
switch options.ResourceType {
case util.RayCluster:
clusterName = options.ResourceName
case util.RayJob:
rayJob, err := clientSet.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Get(ctx, options.ResourceName, v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to retrieve rayjob info for %s: %w", options.ResourceName, err)
}
clusterName = rayJob.Status.RayClusterName
case util.RayService:
rayService, err := clientSet.RayClient().RayV1().RayServices(*options.configFlags.Namespace).Get(ctx, options.ResourceName, v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to retrieve rayjob info for %s: %w", options.ResourceName, err)
}
clusterName = rayService.Status.ActiveServiceStatus.RayClusterName
default:
return fmt.Errorf("unsupported resource type: %s", options.ResourceType)
}

// set the list options for the specified nodetype
var listopts v1.ListOptions
if options.nodeType == "all" {
switch options.nodeType {
case "all":
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/cluster=%s", options.ResourceName),
LabelSelector: fmt.Sprintf("ray.io/cluster=%s", clusterName),
}
} else if options.nodeType == "head" {
case "head":
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", options.ResourceName),
LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", clusterName),
}
} else if options.nodeType == "worker" {
case "worker":
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", options.ResourceName),
LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", clusterName),
}
default:
return fmt.Errorf("Unknown ray resource node type: %s", options.nodeType)
}

// Get list of nodes that are considered the specified node type
rayNodes, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
rayNodes, err := clientSet.KubernetesClient().CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for RayCluster %s: %w", options.ResourceName, err)
return fmt.Errorf("failed to retrieve head node for RayCluster %s: %w", clusterName, err)
}
if len(rayNodes.Items) == 0 {
// Clean up the empty directory if the directory was generated. Since it will always be in current dir, only Remove() is used.
if deleteOutputDir {
os.Remove(options.outputDir)
}
return fmt.Errorf("No ray nodes found for resource %s", options.ResourceName)
return fmt.Errorf("No ray nodes found for resource %s", clusterName)
}

// Get a list of logs of the ray nodes.
var logList []*bytes.Buffer
for _, rayNode := range rayNodes.Items {
// Since the first container is always the ray container, we will retrieve the first container logs
containerName := rayNode.Spec.Containers[0].Name
request := kubeClientSet.CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{Container: containerName})
request := clientSet.KubernetesClient().CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{Container: containerName})

podLogs, err := request.Stream(ctx)
if err != nil {
Expand Down Expand Up @@ -235,7 +291,7 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
}

containerName := rayNodes.Items[ind].Spec.Containers[0].Name
req := kubeClientSet.CoreV1().RESTClient().
req := clientSet.KubernetesClient().CoreV1().RESTClient().
Get().
Namespace(rayNodes.Items[ind].Namespace).
Resource("pods").
Expand Down
93 changes: 85 additions & 8 deletions kubectl-plugin/pkg/cmd/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -113,17 +114,91 @@ func (dre *FakeRemoteExecutor) CreateExecutor(_ *rest.Config, url *url.URL) (rem
}

func TestRayClusterLogComplete(t *testing.T) {
testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
fakeClusterLogOptions := NewClusterLogOptions(testStreams)
fakeArgs := []string{"Expectedoutput"}

cmd := &cobra.Command{Use: "log"}

err := fakeClusterLogOptions.Complete(cmd, fakeArgs)
tests := []struct {
name string
nodeType string
expectedResourceType util.ResourceType
expectedResourceName string
expectedNodeType string
args []string
hasErr bool
}{
{
name: "valide request with raycluster with empty nodetype input",
expectedResourceType: util.RayCluster,
expectedResourceName: "test-raycluster",
expectedNodeType: "all",
args: []string{"test-raycluster"},
hasErr: false,
},
{
name: "valide request with raycluster",
expectedResourceType: util.RayCluster,
expectedResourceName: "test-raycluster",
args: []string{"rayCluster/test-raycluster"},
expectedNodeType: "all",
hasErr: false,
},
{
name: "valide request with rayservice",
expectedResourceType: util.RayService,
expectedResourceName: "test-rayService",
args: []string{"rayService/test-rayService"},
expectedNodeType: "all",
hasErr: false,
},
{
name: "valide request with rayjob",
expectedResourceType: util.RayJob,
expectedResourceName: "test-rayJob",
args: []string{"rayJob/test-rayJob"},
expectedNodeType: "all",
hasErr: false,
},
{
name: "invalid args (no args)",
args: []string{},
hasErr: true,
},
{
name: "invalid args (too many args)",
args: []string{"raycluster/test-raycluster", "extra-arg"},
hasErr: true,
},
{
name: "invalid args (no resource type)",
args: []string{"/test-resource"},
hasErr: true,
},
{
name: "invalid args (no resource name)",
args: []string{"raycluster/"},
hasErr: true,
},
{
name: "invalid args (invalid resource type)",
args: []string{"invalid-type/test-resource"},
hasErr: true,
},
}

assert.Equal(t, fakeClusterLogOptions.nodeType, "all")
assert.Nil(t, err)
assert.Equal(t, fakeClusterLogOptions.ResourceName, fakeArgs[0])
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
fakeClusterLogOptions := NewClusterLogOptions(testStreams)
err := fakeClusterLogOptions.Complete(cmd, tc.args)
if tc.hasErr {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.Equal(t, tc.expectedResourceType, fakeClusterLogOptions.ResourceType)
assert.Equal(t, tc.expectedResourceName, fakeClusterLogOptions.ResourceName)
assert.Equal(t, tc.expectedNodeType, fakeClusterLogOptions.nodeType)
}
})
}
}

func TestRayClusterLogValidate(t *testing.T) {
Expand Down Expand Up @@ -283,6 +358,8 @@ func TestRayClusterLogRun(t *testing.T) {
fakeClusterLogOptions.Executor = &FakeRemoteExecutor{}
fakeClusterLogOptions.ResourceName = "test-cluster"
fakeClusterLogOptions.outputDir = fakeDir
fakeClusterLogOptions.ResourceType = util.RayCluster
fakeClusterLogOptions.nodeType = "all"

// Create list of fake ray heads
rayHeadsList := &v1.PodList{
Expand Down

0 comments on commit e959950

Please sign in to comment.