From c30060aac8159ba27e0170b24edacd96999f3f13 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 13 Sep 2023 16:05:27 +0200 Subject: [PATCH] cephfs: enable VolumeCondition with new health-checker The HealthChecker is configured to use the Staging path pf the volume, with a `.csi/` subdirectory. In the future this directory could be a directory that is not under the Published directory. Signed-off-by: Niels de Vos --- internal/cephfs/driver.go | 2 ++ internal/cephfs/nodeserver.go | 40 ++++++++++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index d134fc048256..2d39133c8246 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -25,6 +25,7 @@ import ( casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,6 +83,7 @@ func NewNodeServer( VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, + healthChecker: hc.NewHealthCheckManager(), } } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde7605b..1cf73121dfe1 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -47,6 +48,7 @@ type NodeServer struct { VolumeLocks *util.VolumeLocks kernelMountOptions string fuseMountOptions string + healthChecker hc.Manager } func getCredentialsForVolume( @@ -270,6 +272,9 @@ func (ns *NodeServer) NodeStageVolume( } } + err = ns.healthChecker.StartChecker(req.GetVolumeId(), stagingTargetPath) + log.WarningLog(ctx, "failed to start healtchecker: %v", err) + return &csi.NodeStageVolumeResponse{}, nil } @@ -479,7 +484,8 @@ func (ns *NodeServer) NodePublishVolume( // Ensure staging target path is a mountpoint. - if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil { + isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath) + if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) return nil, status.Error(codes.Internal, err.Error()) @@ -491,7 +497,7 @@ func (ns *NodeServer) NodePublishVolume( // Check if the volume is already mounted - isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) + isMnt, err = util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -545,6 +551,7 @@ func (ns *NodeServer) NodeUnpublishVolume( // considering kubelet make sure node operations like unpublish/unstage...etc can not be called // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -599,6 +606,9 @@ func (ns *NodeServer) NodeUnstageVolume( } volID := req.GetVolumeId() + + ns.healthChecker.StopChecker(volID) + if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired { log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) @@ -670,6 +680,13 @@ func (ns *NodeServer) NodeGetCapabilities( }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -694,6 +711,18 @@ func (ns *NodeServer) NodeGetVolumeStats( return nil, status.Error(codes.InvalidArgument, err.Error()) } + // health check first, return without stats if unhealthy + healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId()) + if !healthy { + return &csi.NodeGetVolumeStatsResponse{ + VolumeCondition: &csi.VolumeCondition{ + Abnormal: true, + Message: msg.Error(), + }, + }, nil + } + + // warning: stat() may hang on an unhealthy volume stat, err := os.Stat(targetPath) if err != nil { if util.IsCorruptedMountError(err) { @@ -711,7 +740,12 @@ func (ns *NodeServer) NodeGetVolumeStats( } if stat.Mode().IsDir() { - return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false) + res, err := csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false) + if err != nil { + return nil, err + } + + return res, nil } return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)