From 72a2f536a94ed5e66070b4cd75aaf52d75388e03 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Thu, 14 Sep 2023 17:45:01 +0200 Subject: [PATCH 1/4] doc: Add initial design notes for the Health Checker Signed-off-by: Niels de Vos --- docs/design/proposals/volume-condition.md | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 docs/design/proposals/volume-condition.md diff --git a/docs/design/proposals/volume-condition.md b/docs/design/proposals/volume-condition.md new file mode 100644 index 00000000000..bc56f0fdfb3 --- /dev/null +++ b/docs/design/proposals/volume-condition.md @@ -0,0 +1,25 @@ +# Support for CSI `VolumeCondition` aka Volume Health Checker + +## health-checker API + +Under `internal/health-checker` the Manager for health-checking is +implemented. The Manager can start a checking process for a given path, return +the (un)healthy state and stop the checking process when the volume is not +needed anymore. + +The Manager is responsible for creating a suitable checker for the requested +path. If the path is a block-device, the BlockChecker should be created. For a +filesystem path (directory), the FileChecker is appropriate. + +## CephFS + +The health-checker writes to the file `csi-volume-condition.ts` in the root of +the volume. This file contains a JSON formatted timestamp. + +A new `data` directory is introduced for newly created volumes. During the +`NodeStageVolume` call the root of the volume is mounted, and the `data` +directory is bind-mounted inside the container when `NodePublishVolume` is +called. + +The `data` directory makes it possible to place Ceph-CSI internal files in the +root of the volume, without that the user/application has access to it. From d4ceaf13f68024465d935737c5dbf6a683d8a182 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 13 Sep 2023 16:02:16 +0200 Subject: [PATCH 2/4] util: add health-checker for periodic filesystem checks Signed-off-by: Niels de Vos --- internal/health-checker/checker.go | 107 ++++++++++ internal/health-checker/filechecker.go | 118 +++++++++++ internal/health-checker/filechecker_test.go | 82 ++++++++ internal/health-checker/manager.go | 208 ++++++++++++++++++++ internal/health-checker/manager_test.go | 85 ++++++++ internal/health-checker/statchecker.go | 70 +++++++ internal/health-checker/statchecker_test.go | 60 ++++++ 7 files changed, 730 insertions(+) create mode 100644 internal/health-checker/checker.go create mode 100644 internal/health-checker/filechecker.go create mode 100644 internal/health-checker/filechecker_test.go create mode 100644 internal/health-checker/manager.go create mode 100644 internal/health-checker/manager_test.go create mode 100644 internal/health-checker/statchecker.go create mode 100644 internal/health-checker/statchecker_test.go diff --git a/internal/health-checker/checker.go b/internal/health-checker/checker.go new file mode 100644 index 00000000000..5eef779b5a5 --- /dev/null +++ b/internal/health-checker/checker.go @@ -0,0 +1,107 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "sync" + "time" +) + +// command is what is sent through the channel to terminate the go routine. +type command string + +const ( + // stopCommand is sent through the channel to stop checking. + stopCommand = command("STOP") +) + +type checker struct { + // interval contains the time to sleep between health checks. + interval time.Duration + + // timeout contains the delay (interval + timeout) + timeout time.Duration + + // mutex protects against concurrent access to healty, err and + // lastUpdate + mutex *sync.RWMutex + + // current status + isRunning bool + healthy bool + err error + lastUpdate time.Time + + // commands is the channel to read commands from; when to stop. + commands chan command + + runChecker func() +} + +func (c *checker) initDefaults() { + c.interval = 60 * time.Second + c.timeout = 15 * time.Second + c.mutex = &sync.RWMutex{} + c.isRunning = false + c.err = nil + c.healthy = true + c.lastUpdate = time.Now() + c.commands = make(chan command) + + c.runChecker = func() { + panic("BUG: implement runChecker() in the final checker struct") + } +} + +func (c *checker) start() { + if c.isRunning { + return + } + + go c.runChecker() +} + +func (c *checker) stop() { + c.commands <- stopCommand +} + +func (c *checker) isHealthy() (bool, error) { + // check for the last update, it should be within + // + // c.lastUpdate < (c.interval + c.timeout) + // + // Without such a check, a single slow write/read could trigger actions + // to recover an unhealthy volume already. + // + // It is required to check, in case the write or read in the go routine + // is blocked. + + delay := time.Since(c.lastUpdate) + if delay > (c.interval + c.timeout) { + c.mutex.Lock() + c.healthy = false + c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds()) + c.mutex.Unlock() + } + + // read lock to get consistency between the return values + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.healthy, c.err +} diff --git a/internal/health-checker/filechecker.go b/internal/health-checker/filechecker.go new file mode 100644 index 00000000000..4d7defc292d --- /dev/null +++ b/internal/health-checker/filechecker.go @@ -0,0 +1,118 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "errors" + "os" + "path" + "time" +) + +type fileChecker struct { + checker + + // filename contains the filename that is used for checking. + filename string +} + +func newFileChecker(dir string) ConditionChecker { + fc := &fileChecker{ + filename: path.Join(dir, "csi-volume-condition.ts"), + } + fc.initDefaults() + + fc.checker.runChecker = func() { + fc.isRunning = true + + ticker := time.NewTicker(fc.interval) + defer ticker.Stop() + + for { + select { + case <-fc.commands: // STOP command received + fc.isRunning = false + + return + case now := <-ticker.C: + err := fc.writeTimestamp(now) + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + ts, err := fc.readTimestamp() + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + // verify that the written timestamp is read back + if now.Compare(ts) != 0 { + fc.mutex.Lock() + fc.healthy = false + fc.err = errors.New("timestamp read from file does not match what was written") + fc.mutex.Unlock() + + continue + } + + // run health check, write a timestamp to a file, read it back + fc.mutex.Lock() + fc.healthy = true + fc.err = nil + fc.lastUpdate = ts + fc.mutex.Unlock() + } + } + } + + return fc +} + +// readTimestamp reads the JSON formatted timestamp from the file. +func (fc *fileChecker) readTimestamp() (time.Time, error) { + var ts time.Time + + data, err := os.ReadFile(fc.filename) + if err != nil { + return ts, err + } + + err = ts.UnmarshalJSON(data) + + return ts, err +} + +// writeTimestamp writes the timestamp to the file in JSON format. +func (fc *fileChecker) writeTimestamp(ts time.Time) error { + data, err := ts.MarshalJSON() + if err != nil { + return err + } + + //nolint:gosec // allow reading of the timestamp for debugging + return os.WriteFile(fc.filename, data, 0o644) +} diff --git a/internal/health-checker/filechecker_test.go b/internal/health-checker/filechecker_test.go new file mode 100644 index 00000000000..3c67c5b843b --- /dev/null +++ b/internal/health-checker/filechecker_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestFileChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +} + +func TestWriteReadTimestamp(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + ts := time.Now() + + err := checker.writeTimestamp(ts) + if err != nil { + t.Fatalf("failed to write timestamp: %v", err) + } + + _, err = checker.readTimestamp() + if err != nil { + t.Fatalf("failed to read timestamp: %v", err) + } +} diff --git a/internal/health-checker/manager.go b/internal/health-checker/manager.go new file mode 100644 index 00000000000..ed78ec6fa52 --- /dev/null +++ b/internal/health-checker/manager.go @@ -0,0 +1,208 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "os" + "path/filepath" + "sync" +) + +// CheckerType describes the type of health-check that needs to be done. +type CheckerType uint64 + +const ( + // StatCheckerType uses the stat() syscall to validate volume health. + StatCheckerType = iota + // FileCheckerType writes and reads a timestamp to a file for checking the + // volume health. + FileCheckerType +) + +// Manager provides the API for getting the health status of a volume. The main +// usage is requesting the health status by volumeID. +// +// When the Manager detects that a new volumeID is used for checking, a new +// instance of a ConditionChecker is created for the volumeID on the given +// path, and started. +// +// Once the volumeID is not active anymore (when NodeUnstageVolume is called), +// the ConditionChecker needs to be stopped, which can be done by +// Manager.StopChecker(). +type Manager interface { + // StartChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the publishTargetPath, and + // a unique path for this checker. If the path can be used by multiple + // containers, use the StartSharedChecker function instead. + StartChecker(volumeID, path string, ct CheckerType) error + + // StartSharedChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the stagingTargetPath, and + // can be used for multiple containers. + StartSharedChecker(volumeID, path string, ct CheckerType) error + + StopChecker(volumeID, path string) + StopSharedChecker(volumeID string) + + // IsHealthy locates the checker for the volumeID and path. If no checker + // is found, `true` is returned together with an error message. + // When IsHealthy runs into an internal error, it is assumed that the + // volume is healthy. Only when it is confirmed that the volume is + // unhealthy, `false` is returned together with an error message. + IsHealthy(volumeID, path string) (bool, error) +} + +// ConditionChecker describes the interface that a health status reporter needs +// to implement. It is used internally by the Manager only. +type ConditionChecker interface { + // start runs a the health checking function in a new go routine. + start() + + // stop terminates a the health checking function that runs in a go + // routine. + stop() + + // isHealthy returns the status of the volume, without blocking. + isHealthy() (bool, error) +} + +type healthCheckManager struct { + checkers sync.Map // map[volumeID]ConditionChecker +} + +func NewHealthCheckManager() Manager { + return &healthCheckManager{ + checkers: sync.Map{}, + } +} + +func (hcm *healthCheckManager) StartSharedChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, true) +} + +func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, false) +} + +// createChecker decides based on the CheckerType what checker to start for +// the volume. +func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error { + switch ct { + case FileCheckerType: + return hcm.startFileChecker(volumeID, path, shared) + case StatCheckerType: + return hcm.startStatChecker(volumeID, path, shared) + } + + return nil +} + +// startFileChecker initializes the fileChecker and starts it. +func (hcm *healthCheckManager) startFileChecker(volumeID, path string, shared bool) error { + workdir := filepath.Join(path, ".csi") + err := os.Mkdir(workdir, 0o755) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to created workdir %q for health-checker: %w", workdir, err) + } + + cc := newFileChecker(workdir) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startStatChecker initializes the statChecker and starts it. +func (hcm *healthCheckManager) startStatChecker(volumeID, path string, shared bool) error { + cc := newStatChecker(path) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startChecker adds the checker to its map and starts it. +// Shared checkers are key'd by their volumeID, whereas non-shared checkers +// are key'd by theit volumeID+path. +func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path string, shared bool) error { + key := volumeID + if shared { + key = fallbackKey(volumeID, path) + } + + // load the 'old' ConditionChecker if it exists, otherwise store 'cc' + old, ok := hcm.checkers.LoadOrStore(key, cc) + if ok { + // 'old' was loaded, cast it to ConditionChecker + _, ok = old.(ConditionChecker) + if !ok { + return fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + } else { + // 'cc' was stored, start it only once + cc.start() + } + + return nil +} + +func (hcm *healthCheckManager) StopSharedChecker(volumeID string) { + hcm.StopChecker(volumeID, "") +} + +func (hcm *healthCheckManager) StopChecker(volumeID, path string) { + old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path)) + if !ok { + // nothing was loaded, nothing to do + return + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + // failed to cast, should not be possible + return + } + cc.stop() +} + +func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) { + // load the 'old' ConditionChecker if it exists + old, ok := hcm.checkers.Load(volumeID) + if !ok { + // try fallback which include an optional (unique) path (usually publishTargetPath) + old, ok = hcm.checkers.Load(fallbackKey(volumeID, path)) + if !ok { + return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID) + } + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + + return cc.isHealthy() +} + +// fallbackKey returns the key for a checker in the map. If the path is empty, +// it is assumed that the key'd checked is shared. +func fallbackKey(volumeID, path string) string { + if path == "" { + return volumeID + } + + return fmt.Sprintf("%s:%s", volumeID, path) +} diff --git a/internal/health-checker/manager_test.go b/internal/health-checker/manager_test.go new file mode 100644 index 00000000000..2c44f65b1e7 --- /dev/null +++ b/internal/health-checker/manager_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" +) + +func TestManager(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopChecker(volumeID, volumePath) +} + +func TestSharedChecker(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartSharedChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("check health, should be healthy, path is ignored") + healthy, msg = mgr.IsHealthy(volumeID, "different-path") + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopSharedChecker(volumeID) +} diff --git a/internal/health-checker/statchecker.go b/internal/health-checker/statchecker.go new file mode 100644 index 00000000000..d5ef6988342 --- /dev/null +++ b/internal/health-checker/statchecker.go @@ -0,0 +1,70 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "os" + "time" +) + +type statChecker struct { + checker + + // dirname points to the directory that is used for checking. + dirname string +} + +func newStatChecker(dir string) ConditionChecker { + sc := &statChecker{ + dirname: dir, + } + sc.initDefaults() + + sc.checker.runChecker = func() { + sc.isRunning = true + + ticker := time.NewTicker(sc.interval) + defer ticker.Stop() + + for { + select { + case <-sc.commands: // STOP command received + sc.isRunning = false + + return + case now := <-ticker.C: + _, err := os.Stat(sc.dirname) + if err != nil { + sc.mutex.Lock() + sc.healthy = false + sc.err = err + sc.mutex.Unlock() + + continue + } + + sc.mutex.Lock() + sc.healthy = true + sc.err = nil + sc.lastUpdate = now + sc.mutex.Unlock() + } + } + } + + return sc +} diff --git a/internal/health-checker/statchecker_test.go b/internal/health-checker/statchecker_test.go new file mode 100644 index 00000000000..4ea247803e9 --- /dev/null +++ b/internal/health-checker/statchecker_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestStatChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + sc := newStatChecker(volumePath) + checker, ok := sc.(*statChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", sc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +} From 0929db31e49809a88a4d5681051e881aeb393bee Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 13 Sep 2023 16:05:27 +0200 Subject: [PATCH 3/4] cephfs: enable VolumeCondition with new health-checker The HealthChecker is configured to use the Staging path pf the volume, with a `.csi/` subdirectory. In the future this directory could be a directory that is not under the Published directory. Fixes: #4219 Signed-off-by: Niels de Vos --- internal/cephfs/driver.go | 2 ++ internal/cephfs/nodeserver.go | 67 +++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 2ff56750f82..781c89e9ad3 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -25,6 +25,7 @@ import ( casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,6 +83,7 @@ func NewNodeServer( VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, + healthChecker: hc.NewHealthCheckManager(), } } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde7605..94a9cb492c8 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -47,6 +48,7 @@ type NodeServer struct { VolumeLocks *util.VolumeLocks kernelMountOptions string fuseMountOptions string + healthChecker hc.Manager } func getCredentialsForVolume( @@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume( } } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } +// startSharedHealthChecker starts a health-checker on the stagingTargetPath. +// This checker can be shared between multiple containers. +// +// TODO: start a FileChecker for read-writable volumes that have an app-data subdir. +func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) { + // The StatChecker works for volumes that do not have a dedicated app-data + // subdirectory, or are read-only. + err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } +} + func (ns *NodeServer) mount( ctx context.Context, mnt mounter.VolumeMounter, @@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume( // Ensure staging target path is a mountpoint. - if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil { + isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath) + if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) return nil, status.Error(codes.Internal, err.Error()) @@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume( // Check if the volume is already mounted - isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) + isMnt, err = util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume( // considering kubelet make sure node operations like unpublish/unstage...etc can not be called // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume( } volID := req.GetVolumeId() + + ns.healthChecker.StopSharedChecker(volID) + if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired { log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) @@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities( }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats( return nil, status.Error(codes.InvalidArgument, err.Error()) } + // health check first, return without stats if unhealthy + healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath) + + // If healthy and an error is returned, it means that the checker was not + // started. This could happen when the node-plugin was restarted and the + // volume is already staged and published. + if healthy && msg != nil { + // Start a StatChecker for the mounted targetPath, this prevents + // writing a file in the user-visible location. Ideally a (shared) + // FileChecker is started with the stagingTargetPath, but we can't + // get the stagingPath from the request easily. + // TODO: resolve the stagingPath like rbd.getStagingPath() does + err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } + } + + // !healthy indicates a problem with the volume + if !healthy { + return &csi.NodeGetVolumeStatsResponse{ + VolumeCondition: &csi.VolumeCondition{ + Abnormal: true, + Message: msg.Error(), + }, + }, nil + } + + // warning: stat() may hang on an unhealthy volume stat, err := os.Stat(targetPath) if err != nil { if util.IsCorruptedMountError(err) { From 34d26d36ff408bc5334e5bd8cbcca79104214fec Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Tue, 17 Oct 2023 13:55:38 +0200 Subject: [PATCH 4/4] util: if FilesystemNodeGetVolumeStats succeeds the volume is healthy When FilesystemNodeGetVolumeStats() succeeds, the volume must be healthy. This can be included in the VolumeCondition CSI message by default. Checks that detect an abnormal VolumeCondition should prevent calling FilesystemNodeGetVolumeStats() as it is possible that the function will hang. Signed-off-by: Niels de Vos --- internal/csi-common/utils.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index a2bb4ab862d..9c333c9a72f 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -312,6 +312,12 @@ func FilesystemNodeGetVolumeStats( }) } + // include marker for a healthy volume by default + res.VolumeCondition = &csi.VolumeCondition{ + Abnormal: false, + Message: "volume is in a healthy condition", + } + return res, nil }