Skip to content

Commit

Permalink
uevent: should respawn the uevent monitor
Browse files Browse the repository at this point in the history
    - NDM could not work w/o uevent monitor
    - add `inject-udev-monitor-error` for testing
    - Inject udev monitor error with CI

Signed-off-by: Vicente Cheng <[email protected]>
  • Loading branch information
Vicente-Cheng committed Jan 24, 2024
1 parent 44f601c commit f70cfc9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 30 deletions.
5 changes: 4 additions & 1 deletion ci/charts/ndm-override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ image:
tag: ""

autoProvisionFilter: [/dev/sd*]
debug: true
debug: true

# we only manually inject udev monitor error once, so we can test it in CI.
injectUdevMonitorError: true
11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func main() {
DefaultText: "5",
Destination: &opt.MaxConcurrentOps,
},
&cli.BoolFlag{
Name: "inject-udev-monitor-error",
EnvVars: []string{"NDM_INJECT_UDEV_MONITOR_ERROR"},
Usage: "Inject error when monitoring udev events",
Value: false,
Destination: &opt.InjectUdevMonitorError,
},
}

app.Action = func(c *cli.Context) error {
Expand Down Expand Up @@ -171,8 +178,8 @@ func initLogs(opt *option.Option) {
logrus.SetOutput(os.Stdout)
logrus.Infof("Node Disk Manager %s is starting", version.FriendlyVersion())
logrus.Infof("Notable parameters are following:")
logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d",
opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval)
logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d, InjectUdevMonitorError: %v",
opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval, opt.InjectUdevMonitorError)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
logrus.Debugf("Loglevel set to [%v]", logrus.DebugLevel)
Expand Down
21 changes: 11 additions & 10 deletions pkg/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ type Option struct {
NodeName string
Threadiness int

Debug bool
Trace bool
LogFormat string
ProfilerAddress string
VendorFilter string
PathFilter string
LabelFilter string
AutoProvisionFilter string
RescanInterval int64
MaxConcurrentOps uint
Debug bool
Trace bool
LogFormat string
ProfilerAddress string
VendorFilter string
PathFilter string
LabelFilter string
AutoProvisionFilter string
RescanInterval int64
MaxConcurrentOps uint
InjectUdevMonitorError bool
}
59 changes: 42 additions & 17 deletions pkg/udev/uevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,45 @@ import (
)

type Udev struct {
namespace string
nodeName string
startOnce sync.Once
scanner *blockdevice.Scanner
namespace string
nodeName string
startOnce sync.Once
scanner *blockdevice.Scanner
injectError bool
}

func NewUdev(opt *option.Option, scanner *blockdevice.Scanner) *Udev {
return &Udev{
startOnce: sync.Once{},
namespace: opt.Namespace,
nodeName: opt.NodeName,
scanner: scanner,
startOnce: sync.Once{},
namespace: opt.Namespace,
nodeName: opt.NodeName,
scanner: scanner,
injectError: opt.InjectUdevMonitorError,
}
}

func (u *Udev) Monitor(ctx context.Context) {
u.startOnce.Do(func() {
u.monitor(ctx)
})
// we need to respawn the monitor with any error.
// because any error will break the monitor loop.
errChan := make(chan error)
go u.sapwnMonitor(ctx, errChan)

}

func (u *Udev) monitor(ctx context.Context) {
func (u *Udev) sapwnMonitor(ctx context.Context, errChan chan error) {
go u.monitor(ctx, errChan)
for {
select {
case err := <-errChan:
logrus.Errorf("failed to monitor udev events, error: %s", err.Error())
go u.monitor(ctx, errChan)
case <-ctx.Done():
return
}
}
}

func (u *Udev) monitor(ctx context.Context, errors chan error) {
logrus.Infoln("Start monitoring udev processed events")

matcher, err := getOptionalMatcher(nil)
Expand All @@ -55,16 +72,24 @@ func (u *Udev) monitor(ctx context.Context) {
defer conn.Close()

uqueue := make(chan netlink.UEvent)
errors := make(chan error)
quit := conn.Monitor(uqueue, errors, matcher)

errChan := make(chan error)
quit := conn.Monitor(uqueue, errChan, matcher)

// simulator the error from udev monitor
if u.injectError {
logrus.Infof("Injecting error to udev monitor for testing")
errors <- fmt.Errorf("testing error")
u.injectError = false
return
}
// Handling message from udev queue
for {
select {
case uevent := <-uqueue:
u.ActionHandler(uevent)
case err := <-errors:
logrus.Errorf("failed to parse udev event, error: %s", err.Error())
case err := <-errChan:
errors <- err
return
case <-ctx.Done():
close(quit)
return
Expand Down

0 comments on commit f70cfc9

Please sign in to comment.