Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
util: add health-checker for periodic filesystem checks
Browse files Browse the repository at this point in the history
Signed-off-by: Niels de Vos <ndevos@ibm.com>
nixpanic committed Oct 17, 2023
1 parent e7ce294 commit 4b47137
Showing 4 changed files with 413 additions and 0 deletions.
174 changes: 174 additions & 0 deletions internal/health-checker/filechecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker

import (
"errors"
"os"
"path"
"time"
)

// command is what is sent through the channel to terminate the go routine.
type command string

const (
// stopCommand is sent through the channel to stop checking.
stopCommand = command("STOP")
)

type fileChecker struct {
// filename contains the filename that is used for checking.
filename string

// interval contains the time to sleep between health checks.
interval time.Duration

// current status
isRunning bool
healthy bool
err error
lastUpdate time.Time

// commands is the channel to read commands from; when to stop.
commands chan command
}

func newFileChecker(dir string) ConditionChecker {
return &fileChecker{
filename: path.Join(dir, "csi-volume-condition.ts"),
healthy: true,
interval: 120 * time.Second,
lastUpdate: time.Now(),
commands: make(chan command),
}
}

// runChecker is an endless loop that writes a timestamp and reads it back from
// a file.
func (fc *fileChecker) runChecker() {
fc.isRunning = true

for {
if fc.shouldStop() {
fc.isRunning = false

return
}

now := time.Now()

err := fc.writeTimestamp(now)
if err != nil {
fc.healthy = false
fc.err = err

continue
}

ts, err := fc.readTimestamp()
if err != nil {
fc.healthy = false
fc.err = err

continue
}

// verify that the written timestamp is read back
if now.Compare(ts) != 0 {
fc.healthy = false
fc.err = errors.New("timestamp read from file does not match what was written")

continue
}

// run health check, write a timestamp to a file, read it back
fc.healthy = true
fc.err = nil
fc.lastUpdate = ts
}
}

func (fc *fileChecker) shouldStop() bool {
start := time.Now()

for {
// check if we slept long enough to run a next check
slept := time.Since(start)
if slept >= fc.interval {
break
}

select {
case <-fc.commands:
// a command was reveived, need to stop checking
return true
default:
// continue with checking
}

time.Sleep(time.Second)
}

return false
}

func (fc *fileChecker) start() {
go fc.runChecker()
}

func (fc *fileChecker) stop() {
fc.commands <- stopCommand
}

func (fc *fileChecker) isHealthy() (bool, error) {
// check for the last update, it should be within a certain number of
//
// lastUpdate + (N x fc.interval)
//
// Without such a check, a single slow write/read could trigger actions
// to recover an unhealthy volume already.
//
// It is required to check, in case the write or read in the go routine
// is blocked.
return fc.healthy, fc.err
}

// readTimestamp reads the JSON formatted timestamp from the file.
func (fc *fileChecker) readTimestamp() (time.Time, error) {
var ts time.Time

data, err := os.ReadFile(fc.filename)
if err != nil {
return ts, err
}

err = ts.UnmarshalJSON(data)

return ts, err
}

// writeTimestamp writes the timestamp to the file in JSON format.
func (fc *fileChecker) writeTimestamp(ts time.Time) error {
data, err := ts.MarshalJSON()
if err != nil {
return err
}

//nolint:gosec // allow reading of the timestamp for debugging
return os.WriteFile(fc.filename, data, 0o644)
}
78 changes: 78 additions & 0 deletions internal/health-checker/filechecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker

import (
"testing"
"time"
)

func TestFileChecker(t *testing.T) {
t.Parallel()

volumePath := t.TempDir()
fc := newFileChecker(volumePath)
checker, ok := fc.(*fileChecker)
if !ok {
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
}
checker.interval = time.Second * 5

// start the checker
checker.start()

// wait a second to get the go routine running
time.Sleep(time.Second)
if !checker.isRunning {
t.Error("checker failed to start")
}

for i := 0; i < 10; i++ {
// check health, should be healthy
healthy, msg := checker.isHealthy()
if !healthy || msg != nil {
t.Error("volume is unhealthy")
}

time.Sleep(time.Second)
}

// stop the checker
checker.stop()
}

func TestWriteReadTimestamp(t *testing.T) {
t.Parallel()

volumePath := t.TempDir()
fc := newFileChecker(volumePath)
checker, ok := fc.(*fileChecker)
if !ok {
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
}
ts := time.Now()

err := checker.writeTimestamp(ts)
if err != nil {
t.Fatalf("failed to write timestamp: %v", err)
}

_, err = checker.readTimestamp()
if err != nil {
t.Fatalf("failed to read timestamp: %v", err)
}
}
112 changes: 112 additions & 0 deletions internal/health-checker/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker

import (
"fmt"
"sync"
)

// Manager provides the API for getting the health status of a volume. The main
// usage is requesting the health status by path.
//
// When the Manager detects that a new path is used for checking, a new
// instance of a ConditionChecker is created for the path, and started.
//
// Once the path is not active anymore (when NodeUnstageVolume is called), the
// ConditionChecker needs to be stopped, which can be done by
// Manager.StopChecker().
type Manager interface {
StartChecker(path string) error
StopChecker(path string)
IsHealthy(path string) (bool, error)
}

// ConditionChecker describes the interface that a health status reporter needs
// to implement. It is used internally by the Manager only.
type ConditionChecker interface {
// start runs a the health checking function in a new go routine.
start()

// stop terminates a the health checking function that runs in a go
// routine.
stop()

// isHealthy returns the status of the volume, without blocking.
isHealthy() (bool, error)
}

type healthCheckManager struct {
checkers sync.Map // map[path]ConditionChecker
}

func NewHealthCheckManager() Manager {
return &healthCheckManager{
checkers: sync.Map{},
}
}

func (hcm *healthCheckManager) StartChecker(path string) error {
cc := newFileChecker(path)

// load the 'old' ConditionChecker if it exists, otherwuse store 'cc'
old, ok := hcm.checkers.LoadOrStore(path, cc)
if ok {
// 'old' was loaded, cast it to ConditionChecker
_, ok = old.(ConditionChecker)
if !ok {
return fmt.Errorf("failed to cast cc to ConditionChecker for path %q", path)
}
} else {
// 'cc' was stored, start it only once
cc.start()
}

return nil
}

func (hcm *healthCheckManager) StopChecker(path string) {
old, ok := hcm.checkers.LoadAndDelete(path)
if !ok {
// nothing was loaded, nothing to do
return
}

// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
// failed to cast, should not be possible
return
}
cc.stop()
}

func (hcm *healthCheckManager) IsHealthy(path string) (bool, error) {
// load the 'old' ConditionChecker if it exists
old, ok := hcm.checkers.Load(path)
if !ok {
return true, fmt.Errorf("no ConditionChecker for path: %s", path)
}

// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
return true, fmt.Errorf("failed to cast cc to ConditionChecker for path %q", path)
}

return cc.isHealthy()
}
49 changes: 49 additions & 0 deletions internal/health-checker/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecker

import (
"testing"
)

func TestManager(t *testing.T) {
t.Parallel()

volumePath := t.TempDir()
mgr := NewHealthCheckManager()

// expected to have an error in msg
healthy, msg := mgr.IsHealthy(volumePath)
if !(healthy && msg != nil) {
t.Error("ConditionChecker was not started yet, did not get an error")
}

t.Log("start the checker")
err := mgr.StartChecker(volumePath)
if err != nil {
t.Fatalf("ConditionChecker could not get started: %v", err)
}

t.Log("check health, should be healthy")
healthy, msg = mgr.IsHealthy(volumePath)
if !healthy || err != nil {
t.Errorf("volume is unhealthy: %s", msg)
}

t.Log("stop the checker")
mgr.StopChecker(volumePath)
}

0 comments on commit 4b47137

Please sign in to comment.