From a23925b05f6442572fed1391b3fd3606b8742557 Mon Sep 17 00:00:00 2001 From: Bipul Adhikari Date: Mon, 11 Nov 2024 18:51:07 +0545 Subject: [PATCH] Adds support for TLS and token reviews Signed-off-by: Bipul Adhikari --- cmd/manager/main.go | 10 +- config/manager/manager.yaml | 11 ++ deploy/controller/setup-controller.yaml | 11 ++ internal/connection/connection.go | 28 +++- .../csiaddons/csiaddonsnode_controller.go | 7 +- internal/kubernetes/namespace.go | 36 +++++ internal/kubernetes/token/grpc.go | 142 ++++++++++++++++++ sidecar/internal/server/server.go | 30 +++- sidecar/main.go | 17 ++- 9 files changed, 272 insertions(+), 20 deletions(-) create mode 100644 internal/kubernetes/namespace.go create mode 100644 internal/kubernetes/token/grpc.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index df218544b..7f47dbce5 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -77,6 +77,7 @@ func main() { enableAdmissionWebhooks bool ctx = context.Background() cfg = util.NewConfig() + enableTLS bool ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -92,6 +93,7 @@ func main() { flag.BoolVar(&enableAdmissionWebhooks, "enable-admission-webhooks", false, "[DEPRECATED] Enable the admission webhooks") flag.BoolVar(&showVersion, "version", false, "Print Version details") flag.StringVar(&cfg.SchedulePrecedence, "schedule-precedence", "", "The order of precedence in which schedule of reclaimspace and keyrotation is considered. Possible values are sc-only") + flag.BoolVar(&enableTLS, "tls", true, "Enable TLS(enabled by default)") opts := zap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, @@ -145,16 +147,16 @@ func main() { setupLog.Error(err, "unable to start manager") os.Exit(1) } - connPool := connection.NewConnectionPool() ctrlOptions := controller.Options{ MaxConcurrentReconciles: cfg.MaxConcurrentReconciles, } if err = (&controllers.CSIAddonsNodeReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConnPool: connPool, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConnPool: connPool, + EnableTLS: enableTLS, }).SetupWithManager(mgr, ctrlOptions); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CSIAddonsNode") os.Exit(1) diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index fd1c45f20..d4581381b 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -40,6 +40,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + volumeMounts: + - name: certs + mountPath: /etc/tls + readOnly: true securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true @@ -64,5 +68,12 @@ spec: requests: cpu: 10m memory: 64Mi + volumes: + - name: certs + configMap: + name: openshift-service-ca.crt + items: + - key: service-ca.crt + path: tls.crt serviceAccountName: csi-addons-controller-manager terminationGracePeriodSeconds: 10 diff --git a/deploy/controller/setup-controller.yaml b/deploy/controller/setup-controller.yaml index d602b6024..c3ffe630f 100644 --- a/deploy/controller/setup-controller.yaml +++ b/deploy/controller/setup-controller.yaml @@ -104,7 +104,18 @@ spec: securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /etc/tls + name: certs + readOnly: true securityContext: runAsNonRoot: true serviceAccountName: csi-addons-controller-manager terminationGracePeriodSeconds: 10 + volumes: + - configMap: + items: + - key: service-ca.crt + path: tls.crt + name: openshift-service-ca.crt + name: certs diff --git a/internal/connection/connection.go b/internal/connection/connection.go index 0b24f0210..559170a17 100644 --- a/internal/connection/connection.go +++ b/internal/connection/connection.go @@ -18,11 +18,15 @@ package connection import ( "context" + "crypto/tls" + "crypto/x509" + "fmt" "time" + "github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token" "github.com/csi-addons/spec/lib/go/identity" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/credentials" ) // Connection struct consists of to NodeID, DriverName, Capabilities for the controller @@ -39,11 +43,29 @@ type Connection struct { // NewConnection establishes connection with sidecar, fetches capability and returns Connection object // filled with required information. -func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string) (*Connection, error) { +func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string, enableTLS bool) (*Connection, error) { opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithIdleTimeout(time.Duration(0)), } + + opts = append(opts, token.WithServiceAccountToken()) + if enableTLS { + + caFile, caError := token.GetCACert() + if caError != nil { + return nil, (fmt.Errorf("failed to get server cert %v", caError)) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM([]byte(caFile)) { + return nil, fmt.Errorf("failed to append CA cert") + } + tlsConfig := &tls.Config{ + RootCAs: caCertPool, // The CA certificates to verify the server + InsecureSkipVerify: true, + } + creds := credentials.NewTLS(tlsConfig) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } cc, err := grpc.NewClient(endpoint, opts...) if err != nil { return nil, err diff --git a/internal/controller/csiaddons/csiaddonsnode_controller.go b/internal/controller/csiaddons/csiaddonsnode_controller.go index d5f0f763e..4f9a197f2 100644 --- a/internal/controller/csiaddons/csiaddonsnode_controller.go +++ b/internal/controller/csiaddons/csiaddonsnode_controller.go @@ -49,8 +49,9 @@ var ( // CSIAddonsNodeReconciler reconciles a CSIAddonsNode object type CSIAddonsNodeReconciler struct { client.Client - Scheme *runtime.Scheme - ConnPool *connection.ConnectionPool + Scheme *runtime.Scheme + ConnPool *connection.ConnectionPool + EnableTLS bool } //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch @@ -120,7 +121,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques } logger.Info("Connecting to sidecar") - newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name) + newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name, r.EnableTLS) if err != nil { logger.Error(err, "Failed to establish connection with sidecar") diff --git a/internal/kubernetes/namespace.go b/internal/kubernetes/namespace.go new file mode 100644 index 000000000..f47bd4e78 --- /dev/null +++ b/internal/kubernetes/namespace.go @@ -0,0 +1,36 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "io" + "os" +) + +func GetNamespace() (string, error) { + namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + file, err := os.Open(namespaceFile) + if err != nil { + return "", err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/internal/kubernetes/token/grpc.go b/internal/kubernetes/token/grpc.go new file mode 100644 index 000000000..60b6b69ab --- /dev/null +++ b/internal/kubernetes/token/grpc.go @@ -0,0 +1,142 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package token + +import ( + "context" + "fmt" + "io" + "os" + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func WithServiceAccountToken() grpc.DialOption { + return grpc.WithUnaryInterceptor(addAuthorizationHeader) +} + +func addAuthorizationHeader(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + token, err := getToken() + if err != nil { + return err + } + + authCtx := metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+token) + return invoker(authCtx, method, req, reply, cc, opts...) +} + +func getToken() (string, error) { + + const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" + + file, err := os.Open(tokenPath) + if err != nil { + return "", err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return "", err + } + return string(data), nil +} + +func AuthorizationInterceptor(kubeclient kubernetes.Clientset) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if err := authorizeConnection(ctx, kubeclient); err != nil { + return nil, err + } + return handler(ctx, req) + } +} + +func authorizeConnection(ctx context.Context, kubeclient kubernetes.Clientset) error { + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return status.Errorf(codes.Unauthenticated, "missing metadata") + } + + authHeader, ok := md["authorization"] + if !ok || len(authHeader) == 0 { + return status.Errorf(codes.Unauthenticated, "missing authorization token") + } + + token := authHeader[0] + isValidated, err := validateBearerToken(ctx, token, kubeclient) + if !isValidated || (err != nil) { + fmt.Printf("Error after validation %v", err) + return status.Errorf(codes.Unauthenticated, "invalid token") + } + return nil +} + +func removeBearer(authHeader string) string { + // Check if the Authorization header starts with "Bearer" + if strings.HasPrefix(authHeader, "Bearer ") { + // Remove the "Bearer " part and return the token + return strings.TrimPrefix(authHeader, "Bearer ") + } + // If it doesn't start with "Bearer", return the original header + return authHeader +} + +func validateBearerToken(ctx context.Context, token string, kubeclient kubernetes.Clientset) (bool, error) { + tokenReview := &authv1.TokenReview{ + Spec: authv1.TokenReviewSpec{ + Token: removeBearer(token), + }, + } + result, err := kubeclient.AuthenticationV1().TokenReviews().Create(ctx, tokenReview, metav1.CreateOptions{}) + fmt.Printf("result %v", result) + fmt.Printf("token: %v", token) + if err != nil { + fmt.Printf("err in k8s client: %v", err) + return false, fmt.Errorf("failed to review token %v", err) + } + + if result.Status.Authenticated { + return true, nil + } + return false, nil +} + +func readFile(filePath string) (string, error) { + file, err := os.Open(filePath) + if err != nil { + return "", err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return "", err + } + return string(data), nil +} + +func GetCACert() (string, error) { + caCertFile := "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + return readFile(caCertFile) +} diff --git a/sidecar/internal/server/server.go b/sidecar/internal/server/server.go index 112554683..f859b3b45 100644 --- a/sidecar/internal/server/server.go +++ b/sidecar/internal/server/server.go @@ -18,9 +18,13 @@ package server import ( "errors" + "fmt" "net" + "github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + k8s "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -38,15 +42,17 @@ type SidecarServer struct { // URL components to listen on the tcp port scheme string endpoint string + client k8s.Clientset - server *grpc.Server - services []SidecarService + server *grpc.Server + services []SidecarService + enableTLS bool } // NewSidecarServer create a new SidecarServer on the given IP-address and // port. If the IP-address is an empty string, the server will listen on all // available IP-addresses. Only tcp ports are supported. -func NewSidecarServer(ip, port string) *SidecarServer { +func NewSidecarServer(ip string, port int, client k8s.Clientset, enableTLS bool) *SidecarServer { ss := &SidecarServer{} if ss.services == nil { @@ -54,8 +60,9 @@ func NewSidecarServer(ip, port string) *SidecarServer { } ss.scheme = "tcp" - ss.endpoint = ip + ":" + port - + ss.endpoint = ip + ":" + fmt.Sprint(port) + ss.client = client + ss.enableTLS = enableTLS return ss } @@ -69,8 +76,17 @@ func (ss *SidecarServer) RegisterService(svc SidecarService) { // Init creates the internal gRPC server, and registers the SidecarServices. // and starts gRPC server. func (ss *SidecarServer) Start() { - // create the gRPC server and register services - ss.server = grpc.NewServer() + if ss.enableTLS { + creds, err := credentials.NewServerTLSFromFile("/etc/tls/tls.crt", "/etc/tls/tls.key") + if err != nil { + klog.Fatalf("Could not find TLS file: %v", err) + } + // create the gRPC server and register services + ss.server = grpc.NewServer(grpc.UnaryInterceptor(token.AuthorizationInterceptor(ss.client)), grpc.Creds(creds)) + } + if !ss.enableTLS { + ss.server = grpc.NewServer(grpc.UnaryInterceptor(token.AuthorizationInterceptor(ss.client))) + } for _, svc := range ss.services { svc.RegisterService(ss.server) diff --git a/sidecar/main.go b/sidecar/main.go index 24fd3fe84..8775d39b4 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "fmt" "time" "github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service" @@ -43,19 +44,21 @@ func main() { csiAddonsAddress = flag.String("csi-addons-address", "/run/csi-addons/socket", "CSI Addons endopoint") nodeID = flag.String("node-id", "", "NodeID") stagingPath = flag.String("stagingpath", defaultStagingPath, "stagingpath") - controllerPort = flag.String("controller-port", "", + controllerPort = flag.Int("controller-port", 0, "The TCP network port where the gRPC server for controller request, will listen (example: `8080`)") controllerIP = flag.String("controller-ip", "", "The TCP network ip address where the gRPC server for controller request, will listen (example: `192.168.61.228`)") podName = flag.String("pod", "", "name of the Pod that contains this sidecar") podNamespace = flag.String("namespace", "", "namespace of the Pod that contains this sidecar") podUID = flag.String("pod-uid", "", "UID of the Pod that contains this sidecar") + proxyPort = flag.Int("proxy-port", 0, "The TCP port that is available for outside connections to the gRPC server (example: `9070`)") showVersion = flag.Bool("version", false, "Print Version details") leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") + enableTLS = flag.Bool("tls", true, "Enable TLS(enabled by default)") ) klog.InitFlags(nil) @@ -70,7 +73,15 @@ func main() { return } - controllerEndpoint, err := sideutil.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace) + // if proxyPort is set, use that in the CSIAddonsNode.Endpoint URL + publicPort := *proxyPort + publicIP := "" + if publicPort == 0 { + publicPort = *controllerPort + publicIP = *controllerIP + } + + controllerEndpoint, err := sideutil.BuildEndpointURL(publicIP, fmt.Sprint(publicPort), *podName, *podNamespace) if err != nil { klog.Fatalf("Failed to validate controller endpoint: %v", err) } @@ -110,7 +121,7 @@ func main() { klog.Fatalf("Failed to create csiaddonsnode: %v", err) } - sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort) + sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, *kubeClient, *enableTLS) sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient())) sidecarServer.RegisterService(service.NewReclaimSpaceServer(csiClient.GetGRPCClient(), kubeClient, *stagingPath)) sidecarServer.RegisterService(service.NewNetworkFenceServer(csiClient.GetGRPCClient(), kubeClient))