Skip to content

Commit

Permalink
cephfs: enable VolumeCondition with new health-checker
Browse files Browse the repository at this point in the history
The HealthChecker is configured to use the Staging path pf the volume,
with a `.csi/` subdirectory. In the future this directory could be a
directory that is not under the Published directory.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Oct 17, 2023
1 parent e1eee14 commit 40a8334
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
2 changes: 2 additions & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
}
}

Expand Down
59 changes: 58 additions & 1 deletion internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand All @@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks
kernelMountOptions string
fuseMountOptions string
healthChecker hc.Manager
}

func getCredentialsForVolume(
Expand Down Expand Up @@ -228,6 +230,11 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

hcDir, err := getHealthCheckPath(stagingTargetPath)
if err != nil {
ns.healthChecker.StartChecker(hcDir)
}

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -270,6 +277,11 @@ func (ns *NodeServer) NodeStageVolume(
}
}

hcDir, err := getHealthCheckPath(stagingTargetPath)
if err != nil {
ns.healthChecker.StartChecker(hcDir)
}

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -608,6 +620,11 @@ func (ns *NodeServer) NodeUnstageVolume(

stagingTargetPath := req.GetStagingTargetPath()

hcDir, err := getHealthCheckPath(stagingTargetPath)
if err != nil {
ns.healthChecker.StopChecker(hcDir)
}

if err = fsutil.RemoveNodeStageMountinfo(fsutil.VolumeID(volID)); err != nil {
log.ErrorLog(ctx, "cephfs: failed to remove NodeStageMountinfo for volume %s: %v", volID, err)

Expand Down Expand Up @@ -670,6 +687,13 @@ func (ns *NodeServer) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand All @@ -694,6 +718,21 @@ func (ns *NodeServer) NodeGetVolumeStats(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

// health check first, return without stats if unhealthy
hcDir, err := getHealthCheckPath(req.GetStagingTargetPath())
if err == nil {
healthy, msg := ns.healthChecker.IsHealthy(hcDir)
if !healthy {
return &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: msg.Error(),
},
}, nil
}
}

// warning: stat() may hang on an unhealthy volume
stat, err := os.Stat(targetPath)
if err != nil {
if util.IsCorruptedMountError(err) {
Expand All @@ -711,8 +750,26 @@ func (ns *NodeServer) NodeGetVolumeStats(
}

if stat.Mode().IsDir() {
return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
res, err := csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
if err != nil {
return nil, err
}

return res, nil
}

return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)
}

func getHealthCheckPath(stagingTargetPath string) (string, error) {
dir := path.Join(stagingTargetPath, ".csi")
err := os.Mkdir(dir, 0700)

// not an error if the directory exists, allowing the error remoes the
// need for a os.Stat() before creating the directory
if err != nil && !os.IsExist(err) {
return "", err
}

return dir, nil
}

0 comments on commit 40a8334

Please sign in to comment.