Skip to content

Commit

Permalink
Merge pull request #2192 from FabianKramm/refactor-connect
Browse files Browse the repository at this point in the history
refactor: vcluster connect prefer background proxy
  • Loading branch information
FabianKramm authored Oct 1, 2024
2 parents 61df806 + 54c5ace commit 87d99a6
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 270 deletions.
11 changes: 5 additions & 6 deletions pkg/cli/connect_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (cmd *connectHelm) getVClusterKubeConfig(ctx context.Context, vcluster *fin

// check if the vcluster is exposed and set server
if vcluster.Name != "" && cmd.Server == "" && len(command) == 0 {
// check if local kubernetes / can be exposed
err = cmd.setServerIfExposed(ctx, vcluster, kubeConfig)
if err != nil {
return nil, err
Expand All @@ -338,14 +339,14 @@ func (cmd *connectHelm) getVClusterKubeConfig(ctx context.Context, vcluster *fin
if cmd.Server == "" && cmd.BackgroundProxy {
if localkubernetes.IsDockerInstalledAndUpAndRunning() {
// start background container
server, err := localkubernetes.CreateBackgroundProxyContainer(ctx, vcluster.Name, cmd.Namespace, cmd.kubeClientConfig, kubeConfig, cmd.LocalPort, cmd.Log)
cmd.Server, err = localkubernetes.CreateBackgroundProxyContainer(ctx, vcluster.Name, cmd.Namespace, cmd.kubeClientConfig, kubeConfig, cmd.LocalPort, cmd.Log)
if err != nil {
cmd.Log.Warnf("Error exposing local vcluster, will fallback to port-forwarding: %v", err)
cmd.BackgroundProxy = false
}
cmd.Server = server
} else {
cmd.Log.Debugf("Docker is not installed, so skip background proxy")
cmd.BackgroundProxy = false
}
}
}
Expand Down Expand Up @@ -445,16 +446,14 @@ func (cmd *connectHelm) setServerIfExposed(ctx context.Context, vcluster *find.V
}

// not a load balancer? Then don't wait
if service.Spec.Type == corev1.ServiceTypeNodePort {
server, err := localkubernetes.ExposeLocal(ctx, vcluster.Name, cmd.Namespace, &cmd.rawConfig, vClusterConfig, service, cmd.LocalPort, cmd.Log)
if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
server, err := localkubernetes.ExposeLocal(ctx, &cmd.rawConfig, vClusterConfig, service)
if err != nil {
cmd.Log.Warnf("Error exposing local vcluster, will fallback to port-forwarding: %v", err)
}

cmd.Server = server
return true, nil
} else if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
return true, nil
}

if len(service.Status.LoadBalancer.Ingress) == 0 {
Expand Down
5 changes: 0 additions & 5 deletions pkg/cli/delete_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,6 @@ func (cmd *deleteHelm) prepare(vCluster *find.VCluster) error {
return err
}

err = localkubernetes.CleanupLocal(vCluster.Name, vCluster.Namespace, &rawConfig, cmd.log)
if err != nil {
cmd.log.Warnf("error cleaning up: %v", err)
}

// construct proxy name
proxyName := find.VClusterConnectBackgroundProxyName(vCluster.Name, vCluster.Namespace, rawConfig.CurrentContext)
_ = localkubernetes.CleanupBackgroundProxy(proxyName, cmd.log)
Expand Down
261 changes: 4 additions & 257 deletions pkg/cli/localkubernetes/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package localkubernetes
import (
"context"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/loft-sh/log"
Expand All @@ -25,13 +23,10 @@ import (
func (c ClusterType) LocalKubernetes() bool {
return c == ClusterTypeDockerDesktop ||
c == ClusterTypeRancherDesktop ||
c == ClusterTypeKIND ||
c == ClusterTypeMinikube ||
c == ClusterTypeK3D ||
c == ClusterTypeOrbstack
}

func ExposeLocal(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, log log.Logger) (string, error) {
func ExposeLocal(ctx context.Context, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service) (string, error) {
// Timeout to wait for connection before falling back to port-forwarding
timeout := time.Second * 30
clusterType := DetectClusterType(rawConfig)
Expand All @@ -42,138 +37,12 @@ func ExposeLocal(ctx context.Context, vClusterName, vClusterNamespace string, ra
return directConnection(ctx, vRawConfig, service, timeout)
case ClusterTypeRancherDesktop:
return directConnection(ctx, vRawConfig, service, timeout)
case ClusterTypeKIND:
return kindProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
case ClusterTypeMinikube:
return minikubeProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
case ClusterTypeK3D:
return k3dProxy(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, log)
default:
}

return "", nil
}

func CleanupLocal(vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, log log.Logger) error {
if rawConfig == nil {
return errors.New("nil rawConfig")
}

clusterType := DetectClusterType(rawConfig)
switch clusterType {
case ClusterTypeMinikube:
if containerExists(rawConfig.CurrentContext) {
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
}

return nil
case ClusterTypeKIND:
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
case ClusterTypeK3D:
return cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
default:
}

return nil
}

func k3dProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("service is nil")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

k3dName := strings.TrimPrefix(rawConfig.CurrentContext, "k3d-")
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, "k3d-"+k3dName+"-server-0", "k3d-"+k3dName, log)
}

func minikubeProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("nil service")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// check if docker driver or vm
minikubeName := rawConfig.CurrentContext
if containerExists(minikubeName) {
// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

// create proxy container if missing
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, minikubeName, minikubeName, log)
}

// in case other type of driver (e.g. VM on linux) is used
// check if the service is reacheable directly via the minikube IP
c := rawConfig.Contexts[rawConfig.CurrentContext]
if c != nil {
s := rawConfig.Clusters[c.Cluster]
if s != nil {
u, err := url.Parse(s.Server)
if err == nil {
splitted := strings.Split(u.Host, ":")
server := fmt.Sprintf("https://%s:%v", splitted[0], service.Spec.Ports[0].NodePort)

// workaround for the fact that vcluster certificate is not made valid for the node IPs
// but avoid modifying the passed config before the connection is tested
testvConfig := vRawConfig.DeepCopy()
for _, cluster := range testvConfig.Clusters {
if cluster == nil {
continue
}

cluster.CertificateAuthorityData = nil
cluster.InsecureSkipTLSVerify = true
}

// test local connection
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, testvConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
return "", fmt.Errorf("test connection: %w %w", waitErr, err)
}

// now it's safe to modify the vRawConfig struct that was passed in as a pointer
for _, cluster := range vRawConfig.Clusters {
if cluster == nil {
continue
}

cluster.CertificateAuthorityData = nil
cluster.InsecureSkipTLSVerify = true
}

return server, nil
}
}
}

return "", nil
}

func CleanupBackgroundProxy(proxyName string, log log.Logger) error {
// check if background proxy container already exists
if containerExists(proxyName) {
Expand All @@ -191,42 +60,6 @@ func CleanupBackgroundProxy(proxyName string, log log.Logger) error {
return nil
}

func cleanupProxy(vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, log log.Logger) error {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// check if proxy container already exists
cmd := exec.Command(
"docker",
"stop",
proxyName,
)
log.Infof("Stopping docker proxy...")
_, _ = cmd.Output()
return nil
}

func kindProxy(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, log log.Logger) (string, error) {
if service == nil {
return "", errors.New("nil service")
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}

// see if we already have a proxy container running
server, err := getServerFromExistingProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, log)
if err != nil {
return "", err
} else if server != "" {
return server, nil
}

// name is prefixed with kind- and suffixed with -control-plane
controlPlane := strings.TrimPrefix(rawConfig.CurrentContext, "kind-") + "-control-plane"
return createProxyContainer(ctx, vClusterName, vClusterNamespace, rawConfig, vRawConfig, service, localPort, timeout, controlPlane, "kind", log)
}

func directServiceConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, service *corev1.Service, timeout time.Duration) (string, error) {
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
Expand All @@ -250,6 +83,9 @@ func directServiceConnection(ctx context.Context, vRawConfig *clientcmdapi.Confi
}

func directConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, service *corev1.Service, timeout time.Duration) (string, error) {
if service.Spec.Type != corev1.ServiceTypeNodePort {
return "", nil
}
if len(service.Spec.Ports) == 0 {
return "", fmt.Errorf("service has %d ports (expected 1 port)", len(service.Spec.Ports))
}
Expand All @@ -271,49 +107,6 @@ func directConnection(ctx context.Context, vRawConfig *clientcmdapi.Config, serv
return server, nil
}

func createProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, localPort int, timeout time.Duration, backendHost, network string, log log.Logger) (string, error) {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// in general, we need to run this statement to expose the correct port for this
// docker run -d -p LOCAL_PORT:NODE_PORT --rm -e "BACKEND_HOST=NAME-control-plane" -e "BACKEND_PORT=NODE_PORT" --network=NETWORK ghcr.io/loft-sh/docker-tcp-proxy
cmd := exec.Command(
"docker",
"run",
"-d",
"-p",
fmt.Sprintf("%v:%v", localPort, service.Spec.Ports[0].NodePort),
"--rm",
fmt.Sprintf("--name=%s", proxyName),
"-e",
fmt.Sprintf("BACKEND_HOST=%s", backendHost),
"-e",
fmt.Sprintf("BACKEND_PORT=%v", service.Spec.Ports[0].NodePort),
fmt.Sprintf("--network=%s", network),
"ghcr.io/loft-sh/docker-tcp-proxy",
)
log.Infof("Starting proxy container...")
out, err := cmd.Output()
if err != nil {
return "", errors.Errorf("error starting kind proxy: %s %v", string(out), err)
}

server := fmt.Sprintf("https://127.0.0.1:%v", localPort)
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, vRawConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
return "", fmt.Errorf("test connection: %w %w", waitErr, err)
}

return server, nil
}

func CreateBackgroundProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig clientcmd.ClientConfig, vRawConfig *clientcmdapi.Config, localPort int, log log.Logger) (string, error) {
rawConfigObj, err := rawConfig.RawConfig()
if err != nil {
Expand Down Expand Up @@ -428,52 +221,6 @@ func testConnectionWithServer(ctx context.Context, vRawConfig *clientcmdapi.Conf
return nil
}

func getServerFromExistingProxyContainer(ctx context.Context, vClusterName, vClusterNamespace string, rawConfig *clientcmdapi.Config, vRawConfig *clientcmdapi.Config, service *corev1.Service, log log.Logger) (string, error) {
// construct proxy name
proxyName := find.VClusterContextName(vClusterName, vClusterNamespace, rawConfig.CurrentContext)

// check if proxy container already exists
cmd := exec.Command(
"docker",
"inspect",
proxyName,
"-f",
fmt.Sprintf("{{ index (index (index .HostConfig.PortBindings \"%v/tcp\") 0) \"HostPort\" }}", service.Spec.Ports[0].NodePort),
)
out, err := cmd.Output()
if err == nil {
localPort, err := strconv.Atoi(strings.TrimSpace(string(out)))
if err == nil && localPort != 0 {
server := fmt.Sprintf("https://127.0.0.1:%v", localPort)
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, time.Second*5, true, func(ctx context.Context) (bool, error) {
err = testConnectionWithServer(ctx, vRawConfig, server)
if err != nil {
return false, nil
}

return true, nil
})
if waitErr != nil {
// return err here as waitErr is only timed out
return "", errors.Wrap(err, "test connection")
}

return server, nil
}
} else {
log.Debugf("Error running docker inspect with go template: %v", err)
}

if containerExists(proxyName) {
err := cleanupProxy(vClusterName, vClusterNamespace, rawConfig, log)
if err != nil {
return "", err
}
}

return "", nil
}

func containerExists(containerName string) bool {
cmd := exec.Command(
"docker",
Expand Down
5 changes: 3 additions & 2 deletions test/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func CreateFramework(ctx context.Context, scheme *runtime.Scheme) error {
Debug: true,
},
ConnectOptions: cli.ConnectOptions{
KubeConfig: vKubeconfigFile.Name(),
LocalPort: 14550, // choosing a port that usually should be unused
KubeConfig: vKubeconfigFile.Name(),
LocalPort: 14550, // choosing a port that usually should be unused
BackgroundProxy: true,
},
}
err = connectCmd.Run(ctx, []string{name})
Expand Down

0 comments on commit 87d99a6

Please sign in to comment.