diff --git a/chainio/dispatcher.go b/chainio/dispatcher.go index 244a3ac8f7..87bc21fbaa 100644 --- a/chainio/dispatcher.go +++ b/chainio/dispatcher.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/chainntnfs" + "golang.org/x/sync/errgroup" ) // DefaultProcessBlockTimeout is the timeout value used when waiting for one @@ -229,34 +230,30 @@ func DispatchSequential(b Blockbeat, consumers []Consumer) error { // It requires the consumer to finish processing the block within the specified // time, otherwise a timeout error is returned. func DispatchConcurrent(b Blockbeat, consumers []Consumer) error { - // errChans is a map of channels that will be used to receive errors - // returned from notifying the consumers. - errChans := make(map[string]chan error, len(consumers)) + eg := &errgroup.Group{} // Notify each queue in goroutines. for _, c := range consumers { - // Create a signal chan. - errChan := make(chan error, 1) - errChans[c.Name()] = errChan - // Notify each consumer concurrently. - go func(c Consumer, beat Blockbeat) { + eg.Go(func() error { // Send the beat to the consumer. - errChan <- notifyAndWait( - b, c, DefaultProcessBlockTimeout, - ) - }(c, b) - } + err := notifyAndWait(b, c, DefaultProcessBlockTimeout) + + // Exit early if there's no error. + if err == nil { + return nil + } - // Wait for all consumers in each queue to finish. - for name, errChan := range errChans { - err := <-errChan - if err != nil { b.logger().Errorf("Consumer=%v failed to process "+ - "block: %v", name, err) + "block: %v", c.Name(), err) return err - } + }) + } + + // Wait for all consumers in each queue to finish. + if err := eg.Wait(); err != nil { + return err } return nil