diff --git a/pkg/fs/pool.go b/pkg/fs/pool.go index dde167f..e2e4532 100644 --- a/pkg/fs/pool.go +++ b/pkg/fs/pool.go @@ -7,19 +7,22 @@ import ( "github.com/pkg/errors" ) -var ThreadPoolCancelled = errors.Errorf("thread pool cancelled") +var ErrThreadPoolCancelled = errors.Errorf("thread pool cancelled") type ThreadPool struct { - ctx context.Context + ctx context.Context //nolint:containedctx // ignore for now cancel context.CancelFunc n int tasks chan func(context.Context) error err error } +const backlog = 1000 + func NewThreadPool(n int) *ThreadPool { ctx, cancel := context.WithCancel(context.Background()) - return &ThreadPool{ctx, cancel, n, make(chan func(context.Context) error, 1000), nil} + + return &ThreadPool{ctx, cancel, n, make(chan func(context.Context) error, backlog), nil} } func (tp *ThreadPool) Add(f func(context.Context) error) { @@ -33,9 +36,11 @@ func (tp *ThreadPool) DoneAddingJobs() { func (tp *ThreadPool) Run() error { wg := sync.WaitGroup{} wg.Add(tp.n) + for i := 0; i < tp.n; i++ { - go func(i int) { + go func() { defer wg.Done() + for { select { case <-tp.ctx.Done(): @@ -46,16 +51,18 @@ func (tp *ThreadPool) Run() error { } err := f(tp.ctx) - if err != nil && err != ThreadPoolCancelled { + if err != nil && !errors.Is(err, ErrThreadPoolCancelled) { tp.err = err tp.cancel() + return } } } - }(i) + }() } wg.Wait() + return tp.err } diff --git a/pkg/fs/verify.go b/pkg/fs/verify.go index 2545997..4a78538 100644 --- a/pkg/fs/verify.go +++ b/pkg/fs/verify.go @@ -1,9 +1,11 @@ package fs import ( + "context" "fmt" "os" "path/filepath" + "runtime" "strconv" "strings" @@ -94,6 +96,8 @@ func Verify(input, inventory, missing string) error { mdoc.Name = "missing-files-document" mcount := 0 + tpool := NewThreadPool(runtime.NumCPU()) + for _, entry := range inv.Entries { mode, err := strconv.ParseInt(entry.Mode, 8, 32) if err != nil { @@ -106,25 +110,37 @@ func Verify(input, inventory, missing string) error { continue } - if err := checkBOM(input, entry.Path); err != nil { - log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed") + tpool.Add(func(ctx context.Context) error { + if err := checkBOM(input, entry.Path); err != nil { + log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed") - mcount++ + mcount++ - sfile := spdx.NewFile() - sfile.SetEntity( - &spdx.Entity{ - Name: entry.Path, - Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]}, - }, - ) + sfile := spdx.NewFile() + sfile.SetEntity( + &spdx.Entity{ + Name: entry.Path, + Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]}, + }, + ) - if err := mdoc.AddFile(sfile); err != nil { - log.Error().Err(err).Msg("unable to add file to package") + if err := mdoc.AddFile(sfile); err != nil { + log.Error().Err(err).Msg("unable to add file to package") - return err + return err + } } - } + + return nil + }) + } + + tpool.DoneAddingJobs() + + if err := tpool.Run(); err != nil { + log.Error().Err(err).Msg("thread pooling failed") + + return err } if mcount != 0 {