From f70cfc9ddbfb05220ff1ba3410545c0adbca12e9 Mon Sep 17 00:00:00 2001 From: Vicente Cheng Date: Wed, 10 Jan 2024 23:37:42 +0800 Subject: [PATCH] uevent: should respawn the uevent monitor - NDM could not work w/o uevent monitor - add `inject-udev-monitor-error` for testing - Inject udev monitor error with CI Signed-off-by: Vicente Cheng --- ci/charts/ndm-override.yaml | 5 +++- main.go | 11 +++++-- pkg/option/option.go | 21 ++++++------- pkg/udev/uevent.go | 59 ++++++++++++++++++++++++++----------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/ci/charts/ndm-override.yaml b/ci/charts/ndm-override.yaml index e0fd1c25..c11140a2 100644 --- a/ci/charts/ndm-override.yaml +++ b/ci/charts/ndm-override.yaml @@ -5,4 +5,7 @@ image: tag: "" autoProvisionFilter: [/dev/sd*] -debug: true \ No newline at end of file +debug: true + +# we only manually inject udev monitor error once, so we can test it in CI. +injectUdevMonitorError: true \ No newline at end of file diff --git a/main.go b/main.go index f040c31a..16ffd074 100644 --- a/main.go +++ b/main.go @@ -133,6 +133,13 @@ func main() { DefaultText: "5", Destination: &opt.MaxConcurrentOps, }, + &cli.BoolFlag{ + Name: "inject-udev-monitor-error", + EnvVars: []string{"NDM_INJECT_UDEV_MONITOR_ERROR"}, + Usage: "Inject error when monitoring udev events", + Value: false, + Destination: &opt.InjectUdevMonitorError, + }, } app.Action = func(c *cli.Context) error { @@ -171,8 +178,8 @@ func initLogs(opt *option.Option) { logrus.SetOutput(os.Stdout) logrus.Infof("Node Disk Manager %s is starting", version.FriendlyVersion()) logrus.Infof("Notable parameters are following:") - logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d", - opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval) + logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d, InjectUdevMonitorError: %v", + opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval, opt.InjectUdevMonitorError) if opt.Debug { logrus.SetLevel(logrus.DebugLevel) logrus.Debugf("Loglevel set to [%v]", logrus.DebugLevel) diff --git a/pkg/option/option.go b/pkg/option/option.go index 6ac67d19..c797ad28 100644 --- a/pkg/option/option.go +++ b/pkg/option/option.go @@ -6,14 +6,15 @@ type Option struct { NodeName string Threadiness int - Debug bool - Trace bool - LogFormat string - ProfilerAddress string - VendorFilter string - PathFilter string - LabelFilter string - AutoProvisionFilter string - RescanInterval int64 - MaxConcurrentOps uint + Debug bool + Trace bool + LogFormat string + ProfilerAddress string + VendorFilter string + PathFilter string + LabelFilter string + AutoProvisionFilter string + RescanInterval int64 + MaxConcurrentOps uint + InjectUdevMonitorError bool } diff --git a/pkg/udev/uevent.go b/pkg/udev/uevent.go index 71d38558..ac8f2f2f 100644 --- a/pkg/udev/uevent.go +++ b/pkg/udev/uevent.go @@ -19,28 +19,45 @@ import ( ) type Udev struct { - namespace string - nodeName string - startOnce sync.Once - scanner *blockdevice.Scanner + namespace string + nodeName string + startOnce sync.Once + scanner *blockdevice.Scanner + injectError bool } func NewUdev(opt *option.Option, scanner *blockdevice.Scanner) *Udev { return &Udev{ - startOnce: sync.Once{}, - namespace: opt.Namespace, - nodeName: opt.NodeName, - scanner: scanner, + startOnce: sync.Once{}, + namespace: opt.Namespace, + nodeName: opt.NodeName, + scanner: scanner, + injectError: opt.InjectUdevMonitorError, } } func (u *Udev) Monitor(ctx context.Context) { - u.startOnce.Do(func() { - u.monitor(ctx) - }) + // we need to respawn the monitor with any error. + // because any error will break the monitor loop. + errChan := make(chan error) + go u.sapwnMonitor(ctx, errChan) + } -func (u *Udev) monitor(ctx context.Context) { +func (u *Udev) sapwnMonitor(ctx context.Context, errChan chan error) { + go u.monitor(ctx, errChan) + for { + select { + case err := <-errChan: + logrus.Errorf("failed to monitor udev events, error: %s", err.Error()) + go u.monitor(ctx, errChan) + case <-ctx.Done(): + return + } + } +} + +func (u *Udev) monitor(ctx context.Context, errors chan error) { logrus.Infoln("Start monitoring udev processed events") matcher, err := getOptionalMatcher(nil) @@ -55,16 +72,24 @@ func (u *Udev) monitor(ctx context.Context) { defer conn.Close() uqueue := make(chan netlink.UEvent) - errors := make(chan error) - quit := conn.Monitor(uqueue, errors, matcher) - + errChan := make(chan error) + quit := conn.Monitor(uqueue, errChan, matcher) + + // simulator the error from udev monitor + if u.injectError { + logrus.Infof("Injecting error to udev monitor for testing") + errors <- fmt.Errorf("testing error") + u.injectError = false + return + } // Handling message from udev queue for { select { case uevent := <-uqueue: u.ActionHandler(uevent) - case err := <-errors: - logrus.Errorf("failed to parse udev event, error: %s", err.Error()) + case err := <-errChan: + errors <- err + return case <-ctx.Done(): close(quit) return