diff --git a/docs/design/proposals/volume-condition.md b/docs/design/proposals/volume-condition.md new file mode 100644 index 00000000000..bc56f0fdfb3 --- /dev/null +++ b/docs/design/proposals/volume-condition.md @@ -0,0 +1,25 @@ +# Support for CSI `VolumeCondition` aka Volume Health Checker + +## health-checker API + +Under `internal/health-checker` the Manager for health-checking is +implemented. The Manager can start a checking process for a given path, return +the (un)healthy state and stop the checking process when the volume is not +needed anymore. + +The Manager is responsible for creating a suitable checker for the requested +path. If the path is a block-device, the BlockChecker should be created. For a +filesystem path (directory), the FileChecker is appropriate. + +## CephFS + +The health-checker writes to the file `csi-volume-condition.ts` in the root of +the volume. This file contains a JSON formatted timestamp. + +A new `data` directory is introduced for newly created volumes. During the +`NodeStageVolume` call the root of the volume is mounted, and the `data` +directory is bind-mounted inside the container when `NodePublishVolume` is +called. + +The `data` directory makes it possible to place Ceph-CSI internal files in the +root of the volume, without that the user/application has access to it. diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 2ff56750f82..781c89e9ad3 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -25,6 +25,7 @@ import ( casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,6 +83,7 @@ func NewNodeServer( VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, + healthChecker: hc.NewHealthCheckManager(), } } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde7605..94a9cb492c8 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -47,6 +48,7 @@ type NodeServer struct { VolumeLocks *util.VolumeLocks kernelMountOptions string fuseMountOptions string + healthChecker hc.Manager } func getCredentialsForVolume( @@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume( } } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } +// startSharedHealthChecker starts a health-checker on the stagingTargetPath. +// This checker can be shared between multiple containers. +// +// TODO: start a FileChecker for read-writable volumes that have an app-data subdir. +func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) { + // The StatChecker works for volumes that do not have a dedicated app-data + // subdirectory, or are read-only. + err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } +} + func (ns *NodeServer) mount( ctx context.Context, mnt mounter.VolumeMounter, @@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume( // Ensure staging target path is a mountpoint. - if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil { + isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath) + if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) return nil, status.Error(codes.Internal, err.Error()) @@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume( // Check if the volume is already mounted - isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) + isMnt, err = util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume( // considering kubelet make sure node operations like unpublish/unstage...etc can not be called // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume( } volID := req.GetVolumeId() + + ns.healthChecker.StopSharedChecker(volID) + if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired { log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) @@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities( }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats( return nil, status.Error(codes.InvalidArgument, err.Error()) } + // health check first, return without stats if unhealthy + healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath) + + // If healthy and an error is returned, it means that the checker was not + // started. This could happen when the node-plugin was restarted and the + // volume is already staged and published. + if healthy && msg != nil { + // Start a StatChecker for the mounted targetPath, this prevents + // writing a file in the user-visible location. Ideally a (shared) + // FileChecker is started with the stagingTargetPath, but we can't + // get the stagingPath from the request easily. + // TODO: resolve the stagingPath like rbd.getStagingPath() does + err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } + } + + // !healthy indicates a problem with the volume + if !healthy { + return &csi.NodeGetVolumeStatsResponse{ + VolumeCondition: &csi.VolumeCondition{ + Abnormal: true, + Message: msg.Error(), + }, + }, nil + } + + // warning: stat() may hang on an unhealthy volume stat, err := os.Stat(targetPath) if err != nil { if util.IsCorruptedMountError(err) { diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index a2bb4ab862d..9c333c9a72f 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -312,6 +312,12 @@ func FilesystemNodeGetVolumeStats( }) } + // include marker for a healthy volume by default + res.VolumeCondition = &csi.VolumeCondition{ + Abnormal: false, + Message: "volume is in a healthy condition", + } + return res, nil } diff --git a/internal/health-checker/checker.go b/internal/health-checker/checker.go new file mode 100644 index 00000000000..5eef779b5a5 --- /dev/null +++ b/internal/health-checker/checker.go @@ -0,0 +1,107 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "sync" + "time" +) + +// command is what is sent through the channel to terminate the go routine. +type command string + +const ( + // stopCommand is sent through the channel to stop checking. + stopCommand = command("STOP") +) + +type checker struct { + // interval contains the time to sleep between health checks. + interval time.Duration + + // timeout contains the delay (interval + timeout) + timeout time.Duration + + // mutex protects against concurrent access to healty, err and + // lastUpdate + mutex *sync.RWMutex + + // current status + isRunning bool + healthy bool + err error + lastUpdate time.Time + + // commands is the channel to read commands from; when to stop. + commands chan command + + runChecker func() +} + +func (c *checker) initDefaults() { + c.interval = 60 * time.Second + c.timeout = 15 * time.Second + c.mutex = &sync.RWMutex{} + c.isRunning = false + c.err = nil + c.healthy = true + c.lastUpdate = time.Now() + c.commands = make(chan command) + + c.runChecker = func() { + panic("BUG: implement runChecker() in the final checker struct") + } +} + +func (c *checker) start() { + if c.isRunning { + return + } + + go c.runChecker() +} + +func (c *checker) stop() { + c.commands <- stopCommand +} + +func (c *checker) isHealthy() (bool, error) { + // check for the last update, it should be within + // + // c.lastUpdate < (c.interval + c.timeout) + // + // Without such a check, a single slow write/read could trigger actions + // to recover an unhealthy volume already. + // + // It is required to check, in case the write or read in the go routine + // is blocked. + + delay := time.Since(c.lastUpdate) + if delay > (c.interval + c.timeout) { + c.mutex.Lock() + c.healthy = false + c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds()) + c.mutex.Unlock() + } + + // read lock to get consistency between the return values + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.healthy, c.err +} diff --git a/internal/health-checker/filechecker.go b/internal/health-checker/filechecker.go new file mode 100644 index 00000000000..4d7defc292d --- /dev/null +++ b/internal/health-checker/filechecker.go @@ -0,0 +1,118 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "errors" + "os" + "path" + "time" +) + +type fileChecker struct { + checker + + // filename contains the filename that is used for checking. + filename string +} + +func newFileChecker(dir string) ConditionChecker { + fc := &fileChecker{ + filename: path.Join(dir, "csi-volume-condition.ts"), + } + fc.initDefaults() + + fc.checker.runChecker = func() { + fc.isRunning = true + + ticker := time.NewTicker(fc.interval) + defer ticker.Stop() + + for { + select { + case <-fc.commands: // STOP command received + fc.isRunning = false + + return + case now := <-ticker.C: + err := fc.writeTimestamp(now) + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + ts, err := fc.readTimestamp() + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + // verify that the written timestamp is read back + if now.Compare(ts) != 0 { + fc.mutex.Lock() + fc.healthy = false + fc.err = errors.New("timestamp read from file does not match what was written") + fc.mutex.Unlock() + + continue + } + + // run health check, write a timestamp to a file, read it back + fc.mutex.Lock() + fc.healthy = true + fc.err = nil + fc.lastUpdate = ts + fc.mutex.Unlock() + } + } + } + + return fc +} + +// readTimestamp reads the JSON formatted timestamp from the file. +func (fc *fileChecker) readTimestamp() (time.Time, error) { + var ts time.Time + + data, err := os.ReadFile(fc.filename) + if err != nil { + return ts, err + } + + err = ts.UnmarshalJSON(data) + + return ts, err +} + +// writeTimestamp writes the timestamp to the file in JSON format. +func (fc *fileChecker) writeTimestamp(ts time.Time) error { + data, err := ts.MarshalJSON() + if err != nil { + return err + } + + //nolint:gosec // allow reading of the timestamp for debugging + return os.WriteFile(fc.filename, data, 0o644) +} diff --git a/internal/health-checker/filechecker_test.go b/internal/health-checker/filechecker_test.go new file mode 100644 index 00000000000..3c67c5b843b --- /dev/null +++ b/internal/health-checker/filechecker_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestFileChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +} + +func TestWriteReadTimestamp(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + ts := time.Now() + + err := checker.writeTimestamp(ts) + if err != nil { + t.Fatalf("failed to write timestamp: %v", err) + } + + _, err = checker.readTimestamp() + if err != nil { + t.Fatalf("failed to read timestamp: %v", err) + } +} diff --git a/internal/health-checker/manager.go b/internal/health-checker/manager.go new file mode 100644 index 00000000000..ed78ec6fa52 --- /dev/null +++ b/internal/health-checker/manager.go @@ -0,0 +1,208 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "os" + "path/filepath" + "sync" +) + +// CheckerType describes the type of health-check that needs to be done. +type CheckerType uint64 + +const ( + // StatCheckerType uses the stat() syscall to validate volume health. + StatCheckerType = iota + // FileCheckerType writes and reads a timestamp to a file for checking the + // volume health. + FileCheckerType +) + +// Manager provides the API for getting the health status of a volume. The main +// usage is requesting the health status by volumeID. +// +// When the Manager detects that a new volumeID is used for checking, a new +// instance of a ConditionChecker is created for the volumeID on the given +// path, and started. +// +// Once the volumeID is not active anymore (when NodeUnstageVolume is called), +// the ConditionChecker needs to be stopped, which can be done by +// Manager.StopChecker(). +type Manager interface { + // StartChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the publishTargetPath, and + // a unique path for this checker. If the path can be used by multiple + // containers, use the StartSharedChecker function instead. + StartChecker(volumeID, path string, ct CheckerType) error + + // StartSharedChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the stagingTargetPath, and + // can be used for multiple containers. + StartSharedChecker(volumeID, path string, ct CheckerType) error + + StopChecker(volumeID, path string) + StopSharedChecker(volumeID string) + + // IsHealthy locates the checker for the volumeID and path. If no checker + // is found, `true` is returned together with an error message. + // When IsHealthy runs into an internal error, it is assumed that the + // volume is healthy. Only when it is confirmed that the volume is + // unhealthy, `false` is returned together with an error message. + IsHealthy(volumeID, path string) (bool, error) +} + +// ConditionChecker describes the interface that a health status reporter needs +// to implement. It is used internally by the Manager only. +type ConditionChecker interface { + // start runs a the health checking function in a new go routine. + start() + + // stop terminates a the health checking function that runs in a go + // routine. + stop() + + // isHealthy returns the status of the volume, without blocking. + isHealthy() (bool, error) +} + +type healthCheckManager struct { + checkers sync.Map // map[volumeID]ConditionChecker +} + +func NewHealthCheckManager() Manager { + return &healthCheckManager{ + checkers: sync.Map{}, + } +} + +func (hcm *healthCheckManager) StartSharedChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, true) +} + +func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, false) +} + +// createChecker decides based on the CheckerType what checker to start for +// the volume. +func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error { + switch ct { + case FileCheckerType: + return hcm.startFileChecker(volumeID, path, shared) + case StatCheckerType: + return hcm.startStatChecker(volumeID, path, shared) + } + + return nil +} + +// startFileChecker initializes the fileChecker and starts it. +func (hcm *healthCheckManager) startFileChecker(volumeID, path string, shared bool) error { + workdir := filepath.Join(path, ".csi") + err := os.Mkdir(workdir, 0o755) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to created workdir %q for health-checker: %w", workdir, err) + } + + cc := newFileChecker(workdir) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startStatChecker initializes the statChecker and starts it. +func (hcm *healthCheckManager) startStatChecker(volumeID, path string, shared bool) error { + cc := newStatChecker(path) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startChecker adds the checker to its map and starts it. +// Shared checkers are key'd by their volumeID, whereas non-shared checkers +// are key'd by theit volumeID+path. +func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path string, shared bool) error { + key := volumeID + if shared { + key = fallbackKey(volumeID, path) + } + + // load the 'old' ConditionChecker if it exists, otherwise store 'cc' + old, ok := hcm.checkers.LoadOrStore(key, cc) + if ok { + // 'old' was loaded, cast it to ConditionChecker + _, ok = old.(ConditionChecker) + if !ok { + return fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + } else { + // 'cc' was stored, start it only once + cc.start() + } + + return nil +} + +func (hcm *healthCheckManager) StopSharedChecker(volumeID string) { + hcm.StopChecker(volumeID, "") +} + +func (hcm *healthCheckManager) StopChecker(volumeID, path string) { + old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path)) + if !ok { + // nothing was loaded, nothing to do + return + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + // failed to cast, should not be possible + return + } + cc.stop() +} + +func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) { + // load the 'old' ConditionChecker if it exists + old, ok := hcm.checkers.Load(volumeID) + if !ok { + // try fallback which include an optional (unique) path (usually publishTargetPath) + old, ok = hcm.checkers.Load(fallbackKey(volumeID, path)) + if !ok { + return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID) + } + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + + return cc.isHealthy() +} + +// fallbackKey returns the key for a checker in the map. If the path is empty, +// it is assumed that the key'd checked is shared. +func fallbackKey(volumeID, path string) string { + if path == "" { + return volumeID + } + + return fmt.Sprintf("%s:%s", volumeID, path) +} diff --git a/internal/health-checker/manager_test.go b/internal/health-checker/manager_test.go new file mode 100644 index 00000000000..2c44f65b1e7 --- /dev/null +++ b/internal/health-checker/manager_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" +) + +func TestManager(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopChecker(volumeID, volumePath) +} + +func TestSharedChecker(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartSharedChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("check health, should be healthy, path is ignored") + healthy, msg = mgr.IsHealthy(volumeID, "different-path") + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopSharedChecker(volumeID) +} diff --git a/internal/health-checker/statchecker.go b/internal/health-checker/statchecker.go new file mode 100644 index 00000000000..d5ef6988342 --- /dev/null +++ b/internal/health-checker/statchecker.go @@ -0,0 +1,70 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "os" + "time" +) + +type statChecker struct { + checker + + // dirname points to the directory that is used for checking. + dirname string +} + +func newStatChecker(dir string) ConditionChecker { + sc := &statChecker{ + dirname: dir, + } + sc.initDefaults() + + sc.checker.runChecker = func() { + sc.isRunning = true + + ticker := time.NewTicker(sc.interval) + defer ticker.Stop() + + for { + select { + case <-sc.commands: // STOP command received + sc.isRunning = false + + return + case now := <-ticker.C: + _, err := os.Stat(sc.dirname) + if err != nil { + sc.mutex.Lock() + sc.healthy = false + sc.err = err + sc.mutex.Unlock() + + continue + } + + sc.mutex.Lock() + sc.healthy = true + sc.err = nil + sc.lastUpdate = now + sc.mutex.Unlock() + } + } + } + + return sc +} diff --git a/internal/health-checker/statchecker_test.go b/internal/health-checker/statchecker_test.go new file mode 100644 index 00000000000..4ea247803e9 --- /dev/null +++ b/internal/health-checker/statchecker_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestStatChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + sc := newStatChecker(volumePath) + checker, ok := sc.(*statChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", sc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +}