diff --git a/pkg/fs/tpool.go b/pkg/fs/tpool.go new file mode 100644 index 0000000..1377c30 --- /dev/null +++ b/pkg/fs/tpool.go @@ -0,0 +1,61 @@ +package fs + +import ( + "sync" +) + +type ThreadPool interface { + // Add adds a task for the threadpool to consume + Add(taskfn func() error) + + // Done finishes the threadpool + Done() error +} + +type threadpool struct { + wg sync.WaitGroup + n int + backlog int + tasks chan func() error + err error +} + +func NewThreadPool(n, backlog int) *threadpool { + pool := &threadpool{n: n, backlog: backlog, tasks: make(chan func() error, backlog)} + + for i := 0; i < n; i++ { + // start the runners + pool.wg.Add(1) + go pool.runner() + } + + return pool +} + +func (pool *threadpool) Add(f func() error) { + pool.tasks <- f +} + +func (pool *threadpool) runner() { + defer pool.wg.Done() + + for { + taskfn, ok := <-pool.tasks + if !ok { + // no more tasks + return + } + + // ignore failures, save error + if err := taskfn(); err != nil { + pool.err = err + } + } +} + +func (pool *threadpool) Done() error { + close(pool.tasks) + pool.wg.Wait() + + return pool.err +} diff --git a/pkg/fs/verify.go b/pkg/fs/verify.go index 2545997..c4b4fe0 100644 --- a/pkg/fs/verify.go +++ b/pkg/fs/verify.go @@ -4,8 +4,10 @@ import ( "fmt" "os" "path/filepath" + "runtime" "strconv" "strings" + "sync/atomic" "github.com/rs/zerolog/log" "sigs.k8s.io/bom/pkg/spdx" @@ -92,7 +94,11 @@ func Verify(input, inventory, missing string) error { mdoc := bom.NewDocument("", "") mdoc.Name = "missing-files-document" - mcount := 0 + + var mcount atomic.Uint64 + + backlog := 1024 + tpool := NewThreadPool(runtime.NumCPU(), backlog) for _, entry := range inv.Entries { mode, err := strconv.ParseInt(entry.Mode, 8, 32) @@ -106,28 +112,47 @@ func Verify(input, inventory, missing string) error { continue } - if err := checkBOM(input, entry.Path); err != nil { - log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed") + tpool.Add( + func(entry Entry) func() error { + taskfn := func() error { + if err := checkBOM(input, entry.Path); err != nil { + log.Error().Err(err).Str("path", entry.Path).Interface("entry", entry).Msg("inventory verify failed") - mcount++ + mcount.Add(1) - sfile := spdx.NewFile() - sfile.SetEntity( - &spdx.Entity{ - Name: entry.Path, - Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]}, - }, - ) + sfile := spdx.NewFile() + sfile.SetEntity( + &spdx.Entity{ + Name: entry.Path, + Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]}, + }, + ) - if err := mdoc.AddFile(sfile); err != nil { - log.Error().Err(err).Msg("unable to add file to package") + if err := mdoc.AddFile(sfile); err != nil { + log.Error().Err(err).Msg("unable to add file to package") - return err - } - } + return err + } + } + + return nil + } + + return taskfn + }(entry), + ) + } + + // finish with the threadpool + if err := tpool.Done(); err != nil { + log.Error().Err(err).Msg("threadpool failed") + + return err } - if mcount != 0 { + count := mcount.Load() + + if count != 0 { if missing != "" { if err := bom.WriteDocument(mdoc, missing); err != nil { log.Error().Err(err).Str("path", missing).Msg("unable to writing missing entries") @@ -136,7 +161,7 @@ func Verify(input, inventory, missing string) error { } } - return fmt.Errorf("%w: %d entries missing", errors.ErrIncomplete, mcount) + return fmt.Errorf("%w: %d entries missing", errors.ErrIncomplete, count) } return nil