Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uevent: should respawn the uevent monitor #76

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/charts/ndm-override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ image:
tag: ""

autoProvisionFilter: [/dev/sd*]
debug: true
debug: true

# we only manually inject udev monitor error once, so we can test it in CI.
injectUdevMonitorError: true
11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func main() {
DefaultText: "5",
Destination: &opt.MaxConcurrentOps,
},
&cli.BoolFlag{
Name: "inject-udev-monitor-error",
EnvVars: []string{"NDM_INJECT_UDEV_MONITOR_ERROR"},
Usage: "Inject error when monitoring udev events",
Value: false,
Destination: &opt.InjectUdevMonitorError,
},
}

app.Action = func(c *cli.Context) error {
Expand Down Expand Up @@ -171,8 +178,8 @@ func initLogs(opt *option.Option) {
logrus.SetOutput(os.Stdout)
logrus.Infof("Node Disk Manager %s is starting", version.FriendlyVersion())
logrus.Infof("Notable parameters are following:")
logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d",
opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval)
logrus.Infof("Namespace: %s, ConcurrentOps: %d, RescanInterval: %d, InjectUdevMonitorError: %v",
opt.Namespace, opt.MaxConcurrentOps, opt.RescanInterval, opt.InjectUdevMonitorError)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
logrus.Debugf("Loglevel set to [%v]", logrus.DebugLevel)
Expand Down
21 changes: 11 additions & 10 deletions pkg/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ type Option struct {
NodeName string
Threadiness int

Debug bool
Trace bool
LogFormat string
ProfilerAddress string
VendorFilter string
PathFilter string
LabelFilter string
AutoProvisionFilter string
RescanInterval int64
MaxConcurrentOps uint
Debug bool
Trace bool
LogFormat string
ProfilerAddress string
VendorFilter string
PathFilter string
LabelFilter string
AutoProvisionFilter string
RescanInterval int64
MaxConcurrentOps uint
InjectUdevMonitorError bool
}
59 changes: 42 additions & 17 deletions pkg/udev/uevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,45 @@ import (
)

type Udev struct {
namespace string
nodeName string
startOnce sync.Once
scanner *blockdevice.Scanner
namespace string
nodeName string
startOnce sync.Once
scanner *blockdevice.Scanner
injectError bool
}

func NewUdev(opt *option.Option, scanner *blockdevice.Scanner) *Udev {
return &Udev{
startOnce: sync.Once{},
namespace: opt.Namespace,
nodeName: opt.NodeName,
scanner: scanner,
startOnce: sync.Once{},
namespace: opt.Namespace,
nodeName: opt.NodeName,
scanner: scanner,
injectError: opt.InjectUdevMonitorError,
}
}

func (u *Udev) Monitor(ctx context.Context) {
u.startOnce.Do(func() {
u.monitor(ctx)
})
// we need to respawn the monitor with any error.
// because any error will break the monitor loop.
errChan := make(chan error)
go u.spawnMonitor(ctx, errChan)

}

func (u *Udev) monitor(ctx context.Context) {
func (u *Udev) spawnMonitor(ctx context.Context, errChan chan error) {
go u.monitor(ctx, errChan)
for {
select {
case err := <-errChan:
logrus.Errorf("failed to monitor udev events, error: %s", err.Error())
go u.monitor(ctx, errChan)
case <-ctx.Done():
return
}
}
}

func (u *Udev) monitor(ctx context.Context, errors chan error) {
logrus.Infoln("Start monitoring udev processed events")

matcher, err := getOptionalMatcher(nil)
Expand All @@ -55,16 +72,24 @@ func (u *Udev) monitor(ctx context.Context) {
defer conn.Close()

uqueue := make(chan netlink.UEvent)
errors := make(chan error)
quit := conn.Monitor(uqueue, errors, matcher)

errChan := make(chan error)
quit := conn.Monitor(uqueue, errChan, matcher)

// simulator the error from udev monitor
if u.injectError {
logrus.Infof("Injecting error to udev monitor for testing")
errors <- fmt.Errorf("testing error")
u.injectError = false
return
}
// Handling message from udev queue
for {
select {
case uevent := <-uqueue:
u.ActionHandler(uevent)
case err := <-errors:
logrus.Errorf("failed to parse udev event, error: %s", err.Error())
case err := <-errChan:
errors <- err
return
Vicente-Cheng marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
close(quit)
return
Expand Down
Loading