Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CephFS: prevent hanging NodeGetVolumeStats on stat() syscall when an MDS is slow #4200

Merged
merged 4 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/design/proposals/volume-condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Support for CSI `VolumeCondition` aka Volume Health Checker

## health-checker API

Under `internal/health-checker` the Manager for health-checking is
implemented. The Manager can start a checking process for a given path, return
the (un)healthy state and stop the checking process when the volume is not
needed anymore.

The Manager is responsible for creating a suitable checker for the requested
path. If the path is a block-device, the BlockChecker should be created. For a
filesystem path (directory), the FileChecker is appropriate.

## CephFS

The health-checker writes to the file `csi-volume-condition.ts` in the root of
the volume. This file contains a JSON formatted timestamp.

A new `data` directory is introduced for newly created volumes. During the
`NodeStageVolume` call the root of the volume is mounted, and the `data`
Comment on lines +19 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be any problem for the static PVC or the existing PVC which already have data folder created in it? i don't think so but checking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing PVC's would not have the option in the VolumeContext, so there will be a fall-back mechanism.

The current implementation uses a hidden .csi/ directory in the root of the volume, which is basically what is used for volumes that do not have the extra (coming) option in the VolumeContext.

directory is bind-mounted inside the container when `NodePublishVolume` is
called.

The `data` directory makes it possible to place Ceph-CSI internal files in the
root of the volume, without that the user/application has access to it.
2 changes: 2 additions & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
}
}

Expand Down
67 changes: 65 additions & 2 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand All @@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks
kernelMountOptions string
fuseMountOptions string
healthChecker hc.Manager
}

func getCredentialsForVolume(
Expand Down Expand Up @@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume(
}
}

ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

// startSharedHealthChecker starts a health-checker on the stagingTargetPath.
// This checker can be shared between multiple containers.
//
// TODO: start a FileChecker for read-writable volumes that have an app-data subdir.
func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) {
// The StatChecker works for volumes that do not have a dedicated app-data
// subdirectory, or are read-only.
err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType)
if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}

func (ns *NodeServer) mount(
ctx context.Context,
mnt mounter.VolumeMounter,
Expand Down Expand Up @@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume(

// Ensure staging target path is a mountpoint.

if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil {
isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume(

// Check if the volume is already mounted

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
isMnt, err = util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

Expand Down Expand Up @@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume(
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
Expand Down Expand Up @@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume(
}

volID := req.GetVolumeId()

ns.healthChecker.StopSharedChecker(volID)

if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)

Expand Down Expand Up @@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand All @@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

// health check first, return without stats if unhealthy
healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath)

// If healthy and an error is returned, it means that the checker was not
// started. This could happen when the node-plugin was restarted and the
// volume is already staged and published.
if healthy && msg != nil {
// Start a StatChecker for the mounted targetPath, this prevents
// writing a file in the user-visible location. Ideally a (shared)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yhis to this

// FileChecker is started with the stagingTargetPath, but we can't
// get the stagingPath from the request easily.
// TODO: resolve the stagingPath like rbd.getStagingPath() does
err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use something like below to get the stagingPath from targetPath

func (ns *NodeServer) getStagingPath(volPath string) (string, error) {
mounts, err := ns.Mounter.GetMountRefs(volPath)
if err != nil {
return "", err
}
for _, mount := range mounts {
// strip the last directory from the staging path
stp := strings.Split(mount, "/")
stagingTargetPath := strings.Join(stp[:len(stp)-1], "/")
if checkRBDImageMetadataStashExists(stagingTargetPath) {
return stagingTargetPath, nil
}
}
return "", fmt.Errorf("failed to get staging path for volume %s", volPath)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this could be an enhancement. I am not sure if it is worth the effort and additional complexity though.

When a FileChecker is used, it would make more sense as that could detect the existence of the file holding the timestamp.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a TODO comment with reference to rbd.getStagingPath()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also see a very small race for the below case

  • During NodeUnpublish cephcsi stopped the health checker
  • Before umount operation we got a NodeGetVolumeStats RPC call which inturn started a health checker
  • Now we have a stale health check which will be cleanedup only during NodeUnstage (i think)

Is it possible to have such a case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not expect the CO to request NodeUnpublish and then still use the same publishTargetPath in a later NodeGetVolumeStats. If a CO does that, then yes, there would be a stale health checker (if it was started in a NodeGetVolumeStats call, and not in NodeStageVolume).

if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}

// !healthy indicates a problem with the volume
if !healthy {
return &csi.NodeGetVolumeStatsResponse{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for the already running workload if cephcsi is upgraded for this one version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing. The VolumeCondition is not used anywhere directly. Kubernetes can report it, but does not do so by default (yet).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what i mean was if the cephcsi is upgraded we might return the volume as unhealthy for the already mounted pvc because health checker will only gets started during NodeStageVolume? same question for cephcsi restart as well. IsHealthy returns error if there is no health check found

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, tracked in #4219.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually addressed in the latest revision of this PR too

VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: msg.Error(),
},
}, nil
}

// warning: stat() may hang on an unhealthy volume
stat, err := os.Stat(targetPath)
if err != nil {
if util.IsCorruptedMountError(err) {
Expand Down
6 changes: 6 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ func FilesystemNodeGetVolumeStats(
})
}

// include marker for a healthy volume by default
res.VolumeCondition = &csi.VolumeCondition{
Abnormal: false,
Message: "volume is in a healthy condition",
}

return res, nil
}

Expand Down
107 changes: 107 additions & 0 deletions internal/health-checker/checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2023 ceph-csi authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker

import (
"fmt"
"sync"
"time"
)

// command is what is sent through the channel to terminate the go routine.
type command string

const (
// stopCommand is sent through the channel to stop checking.
stopCommand = command("STOP")
)

type checker struct {
// interval contains the time to sleep between health checks.
interval time.Duration

// timeout contains the delay (interval + timeout)
timeout time.Duration

// mutex protects against concurrent access to healty, err and
// lastUpdate
mutex *sync.RWMutex

// current status
isRunning bool
healthy bool
err error
lastUpdate time.Time

// commands is the channel to read commands from; when to stop.
commands chan command

runChecker func()
}

func (c *checker) initDefaults() {
c.interval = 60 * time.Second
c.timeout = 15 * time.Second
c.mutex = &sync.RWMutex{}
c.isRunning = false
c.err = nil
c.healthy = true
c.lastUpdate = time.Now()
c.commands = make(chan command)

c.runChecker = func() {
panic("BUG: implement runChecker() in the final checker struct")
}
}

func (c *checker) start() {
if c.isRunning {
return
}

go c.runChecker()
}

func (c *checker) stop() {
c.commands <- stopCommand
}

func (c *checker) isHealthy() (bool, error) {
// check for the last update, it should be within
//
// c.lastUpdate < (c.interval + c.timeout)
//
// Without such a check, a single slow write/read could trigger actions
// to recover an unhealthy volume already.
//
// It is required to check, in case the write or read in the go routine
// is blocked.

delay := time.Since(c.lastUpdate)
if delay > (c.interval + c.timeout) {
c.mutex.Lock()
c.healthy = false
c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds())
c.mutex.Unlock()
}

// read lock to get consistency between the return values
c.mutex.RLock()
defer c.mutex.RUnlock()

return c.healthy, c.err
}
Loading