diff --git a/cmd/ovm/main.go b/cmd/ovm/main.go index 96b0960..efa9074 100644 --- a/cmd/ovm/main.go +++ b/cmd/ovm/main.go @@ -68,17 +68,9 @@ func main() { exit(1) } - { - if err := event.Init(opt); err != nil { - _ = log.Errorf("event init error: %v", err) - exit(1) - } - - g := errgroup.Group{} - event.Subscribe(&g) - cleans = append(cleans, func() { - _ = g.Wait() - }) + if err := event.Setup(opt); err != nil { + _ = log.Errorf("event init error: %v", err) + exit(1) } agent, err := sshagentsock.Start(opt.SSHAuthSocketPath, log) diff --git a/pkg/ipc/event/event.go b/pkg/ipc/event/event.go index 5d97b38..364ce46 100644 --- a/pkg/ipc/event/event.go +++ b/pkg/ipc/event/event.go @@ -14,7 +14,6 @@ import ( "github.com/Code-Hex/go-infinity-channel" "github.com/oomol-lab/ovm/pkg/cli" "github.com/oomol-lab/ovm/pkg/logger" - "golang.org/x/sync/errgroup" ) type key string @@ -48,7 +47,10 @@ type event struct { var e *event -func Init(opt *cli.Context) error { +// see: https://github.com/Code-Hex/go-infinity-channel/issues/1 +var waitDone = make(chan struct{}) + +func Setup(opt *cli.Context) error { log, err := logger.New(opt.LogPath, opt.Name+"-event") if err != nil { return err @@ -75,15 +77,7 @@ func Init(opt *cli.Context) error { channel: infinity.NewChannel[*datum](), } - return nil -} - -func Subscribe(g *errgroup.Group) { - if e == nil { - return - } - - g.Go(func() error { + go func() { for datum := range e.channel.Out() { uri := fmt.Sprintf("http://ovm/notify?event=%s&message=%s", datum.name, url.QueryEscape(datum.message)) e.log.Infof("notify %s event to %s", datum.name, uri) @@ -98,14 +92,13 @@ func Subscribe(g *errgroup.Group) { } if datum.message == string(Exit) { - e.channel.Close() - e = nil - return nil + waitDone <- struct{}{} + return } } + }() - return nil - }) + return nil } func NotifyApp(name app) { @@ -117,6 +110,14 @@ func NotifyApp(name app) { name: kApp, message: string(name), } + + // wait for the event to be processed + // Exit event indicates the main process exit + if string(name) == string(Exit) { + <-waitDone + close(waitDone) + e.channel.Close() + } } func NotifyError(err error) {