Skip to content

Commit

Permalink
Adds support for TLS encryption between manager and addons
Browse files Browse the repository at this point in the history
Signed-off-by: Bipul Adhikari <[email protected]>
  • Loading branch information
bipuladh committed Dec 20, 2024
1 parent 7f7bc1c commit dbf8b45
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 19 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ the CSI-nodeplugin containers. The side-car registers itself by creating a
`CSIAddonsNode` CR that the CSI-Addons Controller can use to connect to the
side-car and execute operations.

### Enabling authentication for side-car

When deploying the side-car set `enable-auth` flag to true.
Self signed certificates are generated by the sidecar.
Checks for valid Bearer token on request headers and performs token review.
Sidecar must be deployed with TokenReview create and get access for the associated ServiceAccount.

### `csi-addons` executable

The `csi-addons` executable can be used to call CSI-Addons operations against a
Expand All @@ -64,6 +71,11 @@ By listing the `CSIAddonsNode` CRs, the CSI-Addons Controller knows how to
connect to the side-cars. By checking the supported capabilities of the
side-cars, it can decide where to execute operations that the user requested.

### Enabling authentication for manager

When deploying the manager set `enable-auth` flag to true.
This will use TLS for transport layer and adds Bearer token to request headers.

### Installation

Refer to the [installation guide](docs/deploy-controller.md) for more details.
Expand Down
10 changes: 6 additions & 4 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func main() {
ctx = context.Background()
cfg = util.NewConfig()
tlsOpts []func(*tls.Config)
enableAuth bool
)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -99,6 +100,7 @@ func main() {
flag.BoolVar(&secureMetrics, "metrics-secure", true, "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.BoolVar(&showVersion, "version", false, "Print Version details")
flag.StringVar(&cfg.SchedulePrecedence, "schedule-precedence", "", "The order of precedence in which schedule of reclaimspace and keyrotation is considered. Possible values are sc-only")
flag.BoolVar(&enableAuth, "enable-auth", false, "Enables TLS and adds bearer token to the headers (disabled by default)")
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
Expand Down Expand Up @@ -182,16 +184,16 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

connPool := connection.NewConnectionPool()

ctrlOptions := controller.Options{
MaxConcurrentReconciles: cfg.MaxConcurrentReconciles,
}
if err = (&controllers.CSIAddonsNodeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConnPool: connPool,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConnPool: connPool,
EnableAuth: enableAuth,
}).SetupWithManager(mgr, ctrlOptions); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CSIAddonsNode")
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions docs/deploy-controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The CSI-Addons Controller can be deployed by different ways:
| `--leader-elect` | `false` | Enable leader election for controller manager. |
| `--reclaim-space-timeout` | `3m` | Timeout for reclaimspace operation |
| `--max-concurrent-reconciles` | 100 | Maximum number of concurrent reconciles |
| `--enable-auth` | true | Enable adding SA tokens to headers and TLS |

> Note: Some of the above configuration options can also be configured using [`"csi-addons-config"` configmap](./csi-addons-config.md).
Expand Down
20 changes: 16 additions & 4 deletions internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package connection

import (
"context"
"crypto/tls"
"time"

"github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token"

"github.com/csi-addons/spec/lib/go/identity"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

Expand All @@ -39,10 +43,18 @@ type Connection struct {

// NewConnection establishes connection with sidecar, fetches capability and returns Connection object
// filled with required information.
func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string) (*Connection, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithIdleTimeout(time.Duration(0)),
func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string, enableAuth bool) (*Connection, error) {
var opts []grpc.DialOption
if enableAuth {
opts = append(opts, token.WithServiceAccountToken())
tlsConfig := &tls.Config{
// Certs are only used to initiate HTTPS connections; authorization is handled by SA tokens
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cc, err := grpc.NewClient(endpoint, opts...)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ var (
// CSIAddonsNodeReconciler reconciles a CSIAddonsNode object
type CSIAddonsNodeReconciler struct {
client.Client
Scheme *runtime.Scheme
ConnPool *connection.ConnectionPool
Scheme *runtime.Scheme
ConnPool *connection.ConnectionPool
EnableAuth bool
}

//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

logger.Info("Connecting to sidecar")
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name)
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name, r.EnableAuth)
if err != nil {
logger.Error(err, "Failed to establish connection with sidecar")

Expand Down Expand Up @@ -334,7 +335,6 @@ func (r *CSIAddonsNodeReconciler) resolveEndpoint(ctx context.Context, rawURL st
if err != nil {
return "", "", err
}

pod := &corev1.Pod{}
err = r.Client.Get(ctx, client.ObjectKey{
Namespace: namespace,
Expand Down
164 changes: 164 additions & 0 deletions internal/kubernetes/token/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2024 The Kubernetes-CSI-Addons Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package token

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io"
"math/big"
"os"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
authv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

const bearerPrefix = "Bearer "
const authorizationKey = "Authorization"

func WithServiceAccountToken() grpc.DialOption {
return grpc.WithUnaryInterceptor(addAuthorizationHeader)
}

func addAuthorizationHeader(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
token, err := getToken()
if err != nil {
return err
}

authCtx := metadata.AppendToOutgoingContext(ctx, authorizationKey, "Bearer "+token)
return invoker(authCtx, method, req, reply, cc, opts...)
}

func getToken() (string, error) {
return readFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
}

func AuthorizationInterceptor(kubeclient kubernetes.Clientset) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if err := authorizeConnection(ctx, kubeclient); err != nil {
return nil, err
}
return handler(ctx, req)
}
}

func authorizeConnection(ctx context.Context, kubeclient kubernetes.Clientset) error {

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Error(codes.Unauthenticated, "missing metadata")
}

authHeader, ok := md["authorization"]
if !ok || len(authHeader) == 0 {
return status.Error(codes.Unauthenticated, "missing authorization token")
}

token := authHeader[0]
isValidated, err := validateBearerToken(ctx, token, kubeclient)
if err != nil {
return err
}
if !isValidated {

return status.Errorf(codes.Unauthenticated, "invalid token")
}
return nil
}

func parseToken(authHeader string) string {
return strings.TrimPrefix(authHeader, bearerPrefix)
}

func validateBearerToken(ctx context.Context, token string, kubeclient kubernetes.Clientset) (bool, error) {
tokenReview := &authv1.TokenReview{
Spec: authv1.TokenReviewSpec{
Token: parseToken(token),
},
}
result, err := kubeclient.AuthenticationV1().TokenReviews().Create(ctx, tokenReview, metav1.CreateOptions{})
if err != nil {
return false, fmt.Errorf("failed to review token %w", err)
}
if result.Status.Authenticated {
return true, nil
}
return false, nil
}

func readFile(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()

data, err := io.ReadAll(file)
if err != nil {
return "", err
}
return string(data), nil
}

// GenerateSelfSignedCert generates a self-signed certificate and key for use in a TLS connection.
func GenerateSelfSignedCert() (tls.Certificate, error) {
// Generate a private key
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return tls.Certificate{}, err
}

template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"k8s-addons-sidecar-server"},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{
x509.ExtKeyUsageServerAuth,
},
IsCA: true,
}

// Self-sign the certificate
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
if err != nil {
return tls.Certificate{}, err
}

// Encode certificate and key into PEM format
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})

// Load the certificate into a tls.Certificate
return tls.X509KeyPair(certPEM, keyPEM)
}
33 changes: 27 additions & 6 deletions sidecar/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ limitations under the License.
package server

import (
"crypto/tls"
"errors"
"net"

"github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

Expand All @@ -38,15 +43,17 @@ type SidecarServer struct {
// URL components to listen on the tcp port
scheme string
endpoint string
client *k8s.Clientset

server *grpc.Server
services []SidecarService
server *grpc.Server
services []SidecarService
enableAuthChecks bool
}

// NewSidecarServer create a new SidecarServer on the given IP-address and
// port. If the IP-address is an empty string, the server will listen on all
// available IP-addresses. Only tcp ports are supported.
func NewSidecarServer(ip, port string) *SidecarServer {
func NewSidecarServer(ip, port string, client *k8s.Clientset, enableAuthChecks bool) *SidecarServer {
ss := &SidecarServer{}

if ss.services == nil {
Expand All @@ -55,7 +62,8 @@ func NewSidecarServer(ip, port string) *SidecarServer {

ss.scheme = "tcp"
ss.endpoint = ip + ":" + port

ss.client = client
ss.enableAuthChecks = enableAuthChecks
return ss
}

Expand All @@ -69,8 +77,21 @@ func (ss *SidecarServer) RegisterService(svc SidecarService) {
// Init creates the internal gRPC server, and registers the SidecarServices.
// and starts gRPC server.
func (ss *SidecarServer) Start() {
// create the gRPC server and register services
ss.server = grpc.NewServer()
if ss.enableAuthChecks {
cert, err := token.GenerateSelfSignedCert()
if err != nil {
panic("Failed to generate self-signed certificate: " + err.Error())
}

// Create TLS credentials
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
})
// create the gRPC server and register services
ss.server = grpc.NewServer(grpc.UnaryInterceptor(token.AuthorizationInterceptor(*ss.client)), grpc.Creds(creds))
} else {
ss.server = grpc.NewServer()
}

for _, svc := range ss.services {
svc.RegisterService(ss.server)
Expand Down
3 changes: 2 additions & 1 deletion sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
enableAuthzChecks = flag.Bool("enable-auth", false, "Enable Authorization checks and TLS communication (disabled by default)")
)
klog.InitFlags(nil)

Expand Down Expand Up @@ -110,7 +111,7 @@ func main() {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort)
sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, kubeClient, *enableAuthzChecks)
sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient()))
sidecarServer.RegisterService(service.NewReclaimSpaceServer(csiClient.GetGRPCClient(), kubeClient, *stagingPath))
sidecarServer.RegisterService(service.NewNetworkFenceServer(csiClient.GetGRPCClient(), kubeClient))
Expand Down

0 comments on commit dbf8b45

Please sign in to comment.