Skip to content

Commit

Permalink
cephfs: enable VolumeCondition with new health-checker
Browse files Browse the repository at this point in the history
The HealthChecker is configured to use the Staging path pf the volume,
with a `.csi/` subdirectory. In the future this directory could be a
directory that is not under the Published directory.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Oct 26, 2023
1 parent 4d4614c commit c30060a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
2 changes: 2 additions & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
}
}

Expand Down
40 changes: 37 additions & 3 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand All @@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks
kernelMountOptions string
fuseMountOptions string
healthChecker hc.Manager
}

func getCredentialsForVolume(
Expand Down Expand Up @@ -270,6 +272,9 @@ func (ns *NodeServer) NodeStageVolume(
}
}

err = ns.healthChecker.StartChecker(req.GetVolumeId(), stagingTargetPath)
log.WarningLog(ctx, "failed to start healtchecker: %v", err)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -479,7 +484,8 @@ func (ns *NodeServer) NodePublishVolume(

// Ensure staging target path is a mountpoint.

if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil {
isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -491,7 +497,7 @@ func (ns *NodeServer) NodePublishVolume(

// Check if the volume is already mounted

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
isMnt, err = util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

Expand Down Expand Up @@ -545,6 +551,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
Expand Down Expand Up @@ -599,6 +606,9 @@ func (ns *NodeServer) NodeUnstageVolume(
}

volID := req.GetVolumeId()

ns.healthChecker.StopChecker(volID)

if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)

Expand Down Expand Up @@ -670,6 +680,13 @@ func (ns *NodeServer) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand All @@ -694,6 +711,18 @@ func (ns *NodeServer) NodeGetVolumeStats(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

// health check first, return without stats if unhealthy
healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId())
if !healthy {
return &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: msg.Error(),
},
}, nil
}

// warning: stat() may hang on an unhealthy volume
stat, err := os.Stat(targetPath)
if err != nil {
if util.IsCorruptedMountError(err) {
Expand All @@ -711,7 +740,12 @@ func (ns *NodeServer) NodeGetVolumeStats(
}

if stat.Mode().IsDir() {
return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
res, err := csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
if err != nil {
return nil, err
}

return res, nil
}

return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)
Expand Down

0 comments on commit c30060a

Please sign in to comment.