From 7c4be26da375fce9098f8fd451bae9dc885ea32a Mon Sep 17 00:00:00 2001 From: Scott Leggett Date: Mon, 14 Oct 2024 16:48:43 +0800 Subject: [PATCH 1/4] feat: limit concurrent logs sessions Default limit of 32 concurrent log sessions per ssh-portal process. --- cmd/ssh-portal/serve.go | 17 +++++++++-------- internal/k8s/client.go | 5 ++++- internal/k8s/logs.go | 25 ++++++++++++++++++++++--- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/cmd/ssh-portal/serve.go b/cmd/ssh-portal/serve.go index dd498d28..339d2766 100644 --- a/cmd/ssh-portal/serve.go +++ b/cmd/ssh-portal/serve.go @@ -21,13 +21,14 @@ const ( // ServeCmd represents the serve command. type ServeCmd struct { - NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"` - SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"` - HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"` - HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"` - HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"` - LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs'"` - Banner string `kong:"env='BANNER',help='Text sent to remote users before authentication'"` + NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"` + SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"` + HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"` + HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"` + HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"` + LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs'"` + Banner string `kong:"env='BANNER',help='Text sent to remote users before authentication'"` + ConcurrentLogLimit uint `kong:"default='32',env='CONCURRENT_LOG_LIMIT',help='Maximum number of concurrent log sessions'"` } // Run the serve command to handle SSH connection requests. @@ -60,7 +61,7 @@ func (cmd *ServeCmd) Run(log *slog.Logger) error { } defer l.Close() // get kubernetes client - c, err := k8s.NewClient() + c, err := k8s.NewClient(cmd.ConcurrentLogLimit) if err != nil { return fmt.Errorf("couldn't create k8s client: %v", err) } diff --git a/internal/k8s/client.go b/internal/k8s/client.go index b68b828a..593e61af 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -24,10 +25,11 @@ type Client struct { config *rest.Config clientset kubernetes.Interface logStreamIDs sync.Map + logSem *semaphore.Weighted } // NewClient creates a new kubernetes API client. -func NewClient() (*Client, error) { +func NewClient(concurrentLogLimit uint) (*Client, error) { // create the in-cluster config config, err := rest.InClusterConfig() if err != nil { @@ -41,5 +43,6 @@ func NewClient() (*Client, error) { return &Client{ config: config, clientset: clientset, + logSem: semaphore.NewWeighted(int64(concurrentLogLimit)), }, nil } diff --git a/internal/k8s/logs.go b/internal/k8s/logs.go index 6081e797..ccadc0ac 100644 --- a/internal/k8s/logs.go +++ b/internal/k8s/logs.go @@ -3,6 +3,7 @@ package k8s import ( "bufio" "context" + "errors" "fmt" "io" "sync" @@ -27,6 +28,10 @@ var ( // limitBytes defines the maximum number of bytes of logs returned from a // single container limitBytes int64 = 1 * 1024 * 1024 // 1MiB + + // ErrConcurrentLogLimit indicates that the maximum number of concurrent log + // sessions has been reached. + ErrConcurrentLogLimit = errors.New("reached concurrent log limit") ) // linewiseCopy reads strings separated by \n from logStream, and writes them @@ -202,9 +207,23 @@ func (c *Client) newPodInformer(ctx context.Context, // follow=false. // 2. ctx is cancelled (signalling that the SSH channel was closed). // 3. An unrecoverable error occurs. -func (c *Client) Logs(ctx context.Context, - namespace, deployment, container string, follow bool, tailLines int64, - stdio io.ReadWriter) error { +// +// If a call to Logs would exceed the configured maximum number of concurrent +// log sessions, ErrConcurrentLogLimit is returned. +func (c *Client) Logs( + ctx context.Context, + namespace, + deployment, + container string, + follow bool, + tailLines int64, + stdio io.ReadWriter, +) error { + // Exit with an error if we have hit the concurrent log limit. + if !c.logSem.TryAcquire(1) { + return ErrConcurrentLogLimit + } + defer c.logSem.Release(1) // Wrap the context so we can cancel subroutines of this function on error. childCtx, cancel := context.WithCancel(ctx) defer cancel() From f51a8996105bd9c66c88792440efd7ef4626dbec Mon Sep 17 00:00:00 2001 From: Scott Leggett Date: Mon, 14 Oct 2024 17:39:59 +0800 Subject: [PATCH 2/4] feat: add log session time limit Default log session time limit of four hours. --- cmd/ssh-portal/serve.go | 20 +++++++++++--------- internal/k8s/client.go | 10 ++++++---- internal/k8s/logs.go | 14 +++++++++++++- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/cmd/ssh-portal/serve.go b/cmd/ssh-portal/serve.go index 339d2766..19586b92 100644 --- a/cmd/ssh-portal/serve.go +++ b/cmd/ssh-portal/serve.go @@ -7,6 +7,7 @@ import ( "net" "os/signal" "syscall" + "time" "github.com/nats-io/nats.go" "github.com/uselagoon/ssh-portal/internal/k8s" @@ -21,14 +22,15 @@ const ( // ServeCmd represents the serve command. type ServeCmd struct { - NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"` - SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"` - HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"` - HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"` - HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"` - LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs'"` - Banner string `kong:"env='BANNER',help='Text sent to remote users before authentication'"` - ConcurrentLogLimit uint `kong:"default='32',env='CONCURRENT_LOG_LIMIT',help='Maximum number of concurrent log sessions'"` + NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"` + SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"` + HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"` + HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"` + HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"` + LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs'"` + Banner string `kong:"env='BANNER',help='Text sent to remote users before authentication'"` + ConcurrentLogLimit uint `kong:"default='32',env='CONCURRENT_LOG_LIMIT',help='Maximum number of concurrent log sessions'"` + LogTimeLimit time.Duration `kong:"default='4h',env='LOG_TIME_LIMIT',help='Maximum lifetime of each logs session'"` } // Run the serve command to handle SSH connection requests. @@ -61,7 +63,7 @@ func (cmd *ServeCmd) Run(log *slog.Logger) error { } defer l.Close() // get kubernetes client - c, err := k8s.NewClient(cmd.ConcurrentLogLimit) + c, err := k8s.NewClient(cmd.ConcurrentLogLimit, cmd.LogTimeLimit) if err != nil { return fmt.Errorf("couldn't create k8s client: %v", err) } diff --git a/internal/k8s/client.go b/internal/k8s/client.go index 593e61af..cb902dbc 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -26,10 +26,11 @@ type Client struct { clientset kubernetes.Interface logStreamIDs sync.Map logSem *semaphore.Weighted + logTimeLimit time.Duration } // NewClient creates a new kubernetes API client. -func NewClient(concurrentLogLimit uint) (*Client, error) { +func NewClient(concurrentLogLimit uint, logTimeLimit time.Duration) (*Client, error) { // create the in-cluster config config, err := rest.InClusterConfig() if err != nil { @@ -41,8 +42,9 @@ func NewClient(concurrentLogLimit uint) (*Client, error) { return nil, err } return &Client{ - config: config, - clientset: clientset, - logSem: semaphore.NewWeighted(int64(concurrentLogLimit)), + config: config, + clientset: clientset, + logSem: semaphore.NewWeighted(int64(concurrentLogLimit)), + logTimeLimit: logTimeLimit, }, nil } diff --git a/internal/k8s/logs.go b/internal/k8s/logs.go index ccadc0ac..779ca972 100644 --- a/internal/k8s/logs.go +++ b/internal/k8s/logs.go @@ -32,6 +32,9 @@ var ( // ErrConcurrentLogLimit indicates that the maximum number of concurrent log // sessions has been reached. ErrConcurrentLogLimit = errors.New("reached concurrent log limit") + // ErrLogTimeLimit indicates that the maximum log session time has been + // exceeded. + ErrLogTimeLimit = errors.New("exceeded maximum log session time") ) // linewiseCopy reads strings separated by \n from logStream, and writes them @@ -210,6 +213,8 @@ func (c *Client) newPodInformer(ctx context.Context, // // If a call to Logs would exceed the configured maximum number of concurrent // log sessions, ErrConcurrentLogLimit is returned. +// +// If the configured log time limit is exceeded, ErrLogTimeLimit is returned. func (c *Client) Logs( ctx context.Context, namespace, @@ -225,7 +230,8 @@ func (c *Client) Logs( } defer c.logSem.Release(1) // Wrap the context so we can cancel subroutines of this function on error. - childCtx, cancel := context.WithCancel(ctx) + childCtx, cancel := + context.WithTimeoutCause(ctx, c.logTimeLimit, ErrLogTimeLimit) defer cancel() // Generate a requestID value to uniquely distinguish between multiple calls // to this function. This requestID is used in readLogs() to distinguish @@ -272,6 +278,9 @@ func (c *Client) Logs( return fmt.Errorf("couldn't construct new pod informer: %v", err) } podInformer.Run(childCtx.Done()) + if errors.Is(childCtx.Err(), ErrLogTimeLimit) { + return ErrLogTimeLimit + } return nil }) } else { @@ -299,6 +308,9 @@ func (c *Client) Logs( if readLogsErr != nil { return fmt.Errorf("couldn't read logs on existing pods: %v", readLogsErr) } + if errors.Is(childCtx.Err(), context.DeadlineExceeded) { + return ErrLogTimeLimit + } return nil }) } From 3cccef83956c7350e38d626c51642b3533e7a9c6 Mon Sep 17 00:00:00 2001 From: Scott Leggett Date: Mon, 14 Oct 2024 21:20:36 +0800 Subject: [PATCH 3/4] chore: add tests for logs limits * session timeout * concurrent sessions --- internal/k8s/exec_test.go | 2 +- internal/k8s/logs.go | 5 +- internal/k8s/logs_test.go | 105 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 4 deletions(-) diff --git a/internal/k8s/exec_test.go b/internal/k8s/exec_test.go index e92daeab..6976bcf8 100644 --- a/internal/k8s/exec_test.go +++ b/internal/k8s/exec_test.go @@ -153,7 +153,7 @@ func TestIdledDeployLabels(t *testing.T) { t.Run(name, func(tt *testing.T) { // create fake Kubernetes client with test deploys c := &Client{ - clientset: fake.NewSimpleClientset(tc.deploys), + clientset: fake.NewClientset(tc.deploys), } deploys, err := c.idledDeploys(context.Background(), testNS) assert.NoError(tt, err, name) diff --git a/internal/k8s/logs.go b/internal/k8s/logs.go index 779ca972..3b43fb2f 100644 --- a/internal/k8s/logs.go +++ b/internal/k8s/logs.go @@ -230,8 +230,7 @@ func (c *Client) Logs( } defer c.logSem.Release(1) // Wrap the context so we can cancel subroutines of this function on error. - childCtx, cancel := - context.WithTimeoutCause(ctx, c.logTimeLimit, ErrLogTimeLimit) + childCtx, cancel := context.WithTimeout(ctx, c.logTimeLimit) defer cancel() // Generate a requestID value to uniquely distinguish between multiple calls // to this function. This requestID is used in readLogs() to distinguish @@ -278,7 +277,7 @@ func (c *Client) Logs( return fmt.Errorf("couldn't construct new pod informer: %v", err) } podInformer.Run(childCtx.Done()) - if errors.Is(childCtx.Err(), ErrLogTimeLimit) { + if errors.Is(childCtx.Err(), context.DeadlineExceeded) { return ErrLogTimeLimit } return nil diff --git a/internal/k8s/logs_test.go b/internal/k8s/logs_test.go index 010b9ebf..0a2fc26e 100644 --- a/internal/k8s/logs_test.go +++ b/internal/k8s/logs_test.go @@ -1,6 +1,7 @@ package k8s import ( + "bytes" "context" "io" "strings" @@ -8,6 +9,12 @@ import ( "time" "github.com/alecthomas/assert/v2" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" ) func TestLinewiseCopy(t *testing.T) { @@ -44,3 +51,101 @@ func TestLinewiseCopy(t *testing.T) { }) } } + +func TestLogs(t *testing.T) { + testNS := "testns" + testDeploy := "foo" + testPod := "bar" + deploys := &appsv1.DeploymentList{ + Items: []appsv1.Deployment{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testDeploy, + Namespace: testNS, + Labels: map[string]string{ + "idling.lagoon.sh/watch": "true", + }, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": "foo-app", + }, + }, + }, + }, + }, + } + pods := &corev1.PodList{ + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-123xyz", + Namespace: testNS, + Labels: map[string]string{ + "app.kubernetes.io/name": "foo-app", + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: testPod, + }, + }, + }, + }, + }, + } + var testCases = map[string]struct { + follow bool + sessionCount uint + expectError bool + expectedError error + }{ + "no follow": { + sessionCount: 1, + }, + "no follow two sessions": { + sessionCount: 2, + }, + "no follow session count limit exceeded": { + sessionCount: 3, + expectError: true, + expectedError: ErrConcurrentLogLimit, + }, + "follow session timeout": { + follow: true, + sessionCount: 1, + expectError: true, + expectedError: ErrLogTimeLimit, + }, + } + for name, tc := range testCases { + t.Run(name, func(tt *testing.T) { + // create fake Kubernetes client with test deploys + c := &Client{ + clientset: fake.NewClientset(deploys, pods), + logSem: semaphore.NewWeighted(int64(2)), + logTimeLimit: time.Second, + } + // execute test + var buf bytes.Buffer + var eg errgroup.Group + ctx := context.Background() + for range tc.sessionCount { + eg.Go(func() error { + return c.Logs(ctx, testNS, testDeploy, testPod, tc.follow, 10, &buf) + }) + } + // check results + err := eg.Wait() + if tc.expectError { + assert.Error(tt, err, name) + assert.Equal(tt, err, tc.expectedError, name) + } else { + assert.NoError(tt, err, name) + tt.Log(buf.String()) + } + }) + } +} From 243b0391cfb86943c2acdfa96b4afdacfb6ff134 Mon Sep 17 00:00:00 2001 From: Scott Leggett Date: Mon, 14 Oct 2024 21:38:22 +0800 Subject: [PATCH 4/4] feat: export metrics related to new limits --- internal/sshserver/sessionhandler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/sshserver/sessionhandler.go b/internal/sshserver/sessionhandler.go index da208296..5da1e3b4 100644 --- a/internal/sshserver/sessionhandler.go +++ b/internal/sshserver/sessionhandler.go @@ -28,6 +28,14 @@ var ( Name: "sshportal_sessions_total", Help: "The total number of ssh-portal sessions started", }) + execSessions = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sshportal_exec_sessions", + Help: "Current number of ssh-portal exec sessions", + }) + logsSessions = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sshportal_logs_sessions", + Help: "Current number of ssh-portal logs sessions", + }) ) // authCtxValues extracts the context values set by the authhandler. @@ -246,6 +254,9 @@ func startClientKeepalive(ctx context.Context, cancel context.CancelFunc, func doLogs(ctx ssh.Context, log *slog.Logger, s ssh.Session, deployment, container string, follow bool, tailLines int64, c K8SAPIService) { + // update metrics + logsSessions.Inc() + defer logsSessions.Dec() // Wrap the ssh.Context so we can cancel goroutines started from this // function without affecting the SSH session. childCtx, cancel := context.WithCancel(ctx) @@ -280,6 +291,9 @@ func doLogs(ctx ssh.Context, log *slog.Logger, s ssh.Session, deployment, func doExec(ctx ssh.Context, log *slog.Logger, s ssh.Session, deployment, container string, cmd []string, c K8SAPIService, pty bool, winch <-chan ssh.Window) { + // update metrics + execSessions.Inc() + defer execSessions.Dec() err := c.Exec(ctx, s.User(), deployment, container, cmd, s, s.Stderr(), pty, winch) if err != nil {