Skip to content

Commit

Permalink
fix: addendum
Browse files Browse the repository at this point in the history
Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha committed Feb 21, 2024
1 parent b82a95d commit 626f3c0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
19 changes: 13 additions & 6 deletions pkg/fs/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import (
"github.com/pkg/errors"
)

var ThreadPoolCancelled = errors.Errorf("thread pool cancelled")
var ErrThreadPoolCancelled = errors.Errorf("thread pool cancelled")

type ThreadPool struct {
ctx context.Context
ctx context.Context //nolint:containedctx // ignore for now
cancel context.CancelFunc
n int
tasks chan func(context.Context) error
err error
}

const backlog = 1000

func NewThreadPool(n int) *ThreadPool {
ctx, cancel := context.WithCancel(context.Background())
return &ThreadPool{ctx, cancel, n, make(chan func(context.Context) error, 1000), nil}

return &ThreadPool{ctx, cancel, n, make(chan func(context.Context) error, backlog), nil}
}

func (tp *ThreadPool) Add(f func(context.Context) error) {
Expand All @@ -33,9 +36,11 @@ func (tp *ThreadPool) DoneAddingJobs() {
func (tp *ThreadPool) Run() error {
wg := sync.WaitGroup{}
wg.Add(tp.n)

for i := 0; i < tp.n; i++ {
go func(i int) {
go func() {
defer wg.Done()

for {
select {
case <-tp.ctx.Done():
Expand All @@ -46,16 +51,18 @@ func (tp *ThreadPool) Run() error {
}

err := f(tp.ctx)
if err != nil && err != ThreadPoolCancelled {
if err != nil && !errors.Is(err, ErrThreadPoolCancelled) {
tp.err = err
tp.cancel()

return
}
}
}
}(i)
}()
}

wg.Wait()

return tp.err
}
44 changes: 30 additions & 14 deletions pkg/fs/verify.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package fs

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"

Expand Down Expand Up @@ -94,6 +96,8 @@ func Verify(input, inventory, missing string) error {
mdoc.Name = "missing-files-document"
mcount := 0

tpool := NewThreadPool(runtime.NumCPU())

for _, entry := range inv.Entries {
mode, err := strconv.ParseInt(entry.Mode, 8, 32)
if err != nil {
Expand All @@ -106,25 +110,37 @@ func Verify(input, inventory, missing string) error {
continue
}

if err := checkBOM(input, entry.Path); err != nil {
log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed")
tpool.Add(func(ctx context.Context) error {
if err := checkBOM(input, entry.Path); err != nil {
log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed")

mcount++
mcount++

sfile := spdx.NewFile()
sfile.SetEntity(
&spdx.Entity{
Name: entry.Path,
Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]},
},
)
sfile := spdx.NewFile()
sfile.SetEntity(
&spdx.Entity{
Name: entry.Path,
Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]},
},
)

if err := mdoc.AddFile(sfile); err != nil {
log.Error().Err(err).Msg("unable to add file to package")
if err := mdoc.AddFile(sfile); err != nil {
log.Error().Err(err).Msg("unable to add file to package")

return err
return err
}
}
}

return nil
})
}

tpool.DoneAddingJobs()

if err := tpool.Run(); err != nil {
log.Error().Err(err).Msg("thread pooling failed")

return err
}

if mcount != 0 {
Expand Down

0 comments on commit 626f3c0

Please sign in to comment.