Skip to content

Commit

Permalink
feat: changed returned type and added check for in progress tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ilkamo committed Dec 26, 2024
1 parent bca6c25 commit d54e8d4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 21 deletions.
39 changes: 19 additions & 20 deletions polipo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type Task[T any] func() (T, error)
// Polipo stores a list of Tasks to be run concurrently.
type Polipo[T any] struct {
sync.RWMutex
tasks []Task[T]
maxConcurrency int
concurrencyBuffer chan struct{}
processing bool // processing is used to prevent adding tasks while Do is running.
tasks []Task[T] // tasks is a list of Tasks to be run concurrently.
maxConcurrency int // maxConcurrency is the maximum number of concurrent tasks to run.
concurrencyBuffer chan struct{} // concurrencyBuffer is used to limit the number of concurrent tasks.
processing bool // processing is used to prevent adding tasks while Do is running.
}

// NewPolipo creates a new Polipo. It accepts options to configure the Polipo.
Expand Down Expand Up @@ -65,6 +65,10 @@ func (p *Polipo[T]) AddTask(task Task[T]) error {
// set by passing `WithMaxConcurrency` as an option. The default is 10.
// This is a blocking function that will return when all the Tasks have finished their work.
func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) {
if len(p.tasks) == 0 {
return nil, errors.New("no tasks to do")
}

if p.processing {
return nil, errors.New("already processing tasks")
}
Expand All @@ -77,11 +81,7 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) {

p.processing = true

if len(p.tasks) == 0 {
return nil, errors.New("no tasks to do")
}

resultsChan := make(chan result[T])
processedChan := make(chan processed[T])
wg := sync.WaitGroup{}

wg.Add(len(p.tasks))
Expand All @@ -94,10 +94,10 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) {

go func(t Task[T]) {
defer wg.Done()
items, err := t()
result, err := t()

select {
case resultsChan <- result[T]{items, err}:
case processedChan <- processed[T]{result, err}:
case <-ctx.Done():
}

Expand All @@ -110,7 +110,7 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) {
// Wait for all tasks to finish.
go func() {
wg.Wait()
close(resultsChan)
close(processedChan)
}()

var (
Expand All @@ -127,24 +127,23 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) {
}

select {
case res, ok := <-resultsChan:
case r, ok := <-processedChan:
if !ok {
return results, errors.Join(errs...)
}

if res.err != nil {
errs = append(errs, res.err)
continue
if r.err != nil {
errs = append(errs, r.err)
}

results = append(results, res.resp)
results = append(results, r.result)
case <-ctx.Done():
return results, errors.Join(append(errs, ctx.Err())...)
}
}
}

type result[T any] struct {
resp T
err error
type processed[T any] struct {
result T
err error
}
44 changes: 43 additions & 1 deletion polipo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,17 @@ func TestPolipo_Do(t *testing.T) {

allResults, err := p.Do(ctx)
require.ErrorContains(t, err, "nothing in the ocean")
require.Len(t, allResults, 1)
require.Len(t, allResults, 2)
require.ElementsMatch(t, []TaskResult{
{
Fishes: []string{
"Swordfish",
"Marlin",
},
},
{
Fishes: nil,
},
}, allResults)
})

Expand Down Expand Up @@ -194,6 +197,45 @@ func TestPolipo_Do(t *testing.T) {
},
}, allResults)
})

t.Run("adding tasks should return an error if already processing", func(t *testing.T) {
ctx := context.TODO()

p := polipo.NewPolipo[TaskResult]()

err := p.AddTask(func() (TaskResult, error) {
time.Sleep(time.Second)

return TaskResult{}, nil
})
require.NoError(t, err)

go func() {
_, _ = p.Do(ctx)
}()

time.Sleep(time.Millisecond * 100)

err = p.AddTask(func() (TaskResult, error) {
return TaskResult{
Fishes: []string{
"Salmon",
"Tuna",
},
}, nil
})
require.ErrorContains(t, err, "cannot add tasks while processing")
})

t.Run("should return an error if no tasks to do", func(t *testing.T) {
ctx := context.TODO()

p := polipo.NewPolipo[TaskResult]()

allResults, err := p.Do(ctx)
require.ErrorContains(t, err, "no tasks to do")
require.Empty(t, allResults)
})
}

var testCases = []struct {
Expand Down

0 comments on commit d54e8d4

Please sign in to comment.