diff --git a/polipo.go b/polipo.go index 5eeae22..e5f0a17 100644 --- a/polipo.go +++ b/polipo.go @@ -17,10 +17,10 @@ type Task[T any] func() (T, error) // Polipo stores a list of Tasks to be run concurrently. type Polipo[T any] struct { sync.RWMutex - tasks []Task[T] - maxConcurrency int - concurrencyBuffer chan struct{} - processing bool // processing is used to prevent adding tasks while Do is running. + tasks []Task[T] // tasks is a list of Tasks to be run concurrently. + maxConcurrency int // maxConcurrency is the maximum number of concurrent tasks to run. + concurrencyBuffer chan struct{} // concurrencyBuffer is used to limit the number of concurrent tasks. + processing bool // processing is used to prevent adding tasks while Do is running. } // NewPolipo creates a new Polipo. It accepts options to configure the Polipo. @@ -65,6 +65,10 @@ func (p *Polipo[T]) AddTask(task Task[T]) error { // set by passing `WithMaxConcurrency` as an option. The default is 10. // This is a blocking function that will return when all the Tasks have finished their work. func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) { + if len(p.tasks) == 0 { + return nil, errors.New("no tasks to do") + } + if p.processing { return nil, errors.New("already processing tasks") } @@ -77,11 +81,7 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) { p.processing = true - if len(p.tasks) == 0 { - return nil, errors.New("no tasks to do") - } - - resultsChan := make(chan result[T]) + processedChan := make(chan processed[T]) wg := sync.WaitGroup{} wg.Add(len(p.tasks)) @@ -94,10 +94,10 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) { go func(t Task[T]) { defer wg.Done() - items, err := t() + result, err := t() select { - case resultsChan <- result[T]{items, err}: + case processedChan <- processed[T]{result, err}: case <-ctx.Done(): } @@ -110,7 +110,7 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) { // Wait for all tasks to finish. go func() { wg.Wait() - close(resultsChan) + close(processedChan) }() var ( @@ -127,24 +127,23 @@ func (p *Polipo[T]) Do(ctx context.Context) ([]T, error) { } select { - case res, ok := <-resultsChan: + case r, ok := <-processedChan: if !ok { return results, errors.Join(errs...) } - if res.err != nil { - errs = append(errs, res.err) - continue + if r.err != nil { + errs = append(errs, r.err) } - results = append(results, res.resp) + results = append(results, r.result) case <-ctx.Done(): return results, errors.Join(append(errs, ctx.Err())...) } } } -type result[T any] struct { - resp T - err error +type processed[T any] struct { + result T + err error } diff --git a/polipo_test.go b/polipo_test.go index d911336..3b3d662 100644 --- a/polipo_test.go +++ b/polipo_test.go @@ -132,7 +132,7 @@ func TestPolipo_Do(t *testing.T) { allResults, err := p.Do(ctx) require.ErrorContains(t, err, "nothing in the ocean") - require.Len(t, allResults, 1) + require.Len(t, allResults, 2) require.ElementsMatch(t, []TaskResult{ { Fishes: []string{ @@ -140,6 +140,9 @@ func TestPolipo_Do(t *testing.T) { "Marlin", }, }, + { + Fishes: nil, + }, }, allResults) }) @@ -194,6 +197,45 @@ func TestPolipo_Do(t *testing.T) { }, }, allResults) }) + + t.Run("adding tasks should return an error if already processing", func(t *testing.T) { + ctx := context.TODO() + + p := polipo.NewPolipo[TaskResult]() + + err := p.AddTask(func() (TaskResult, error) { + time.Sleep(time.Second) + + return TaskResult{}, nil + }) + require.NoError(t, err) + + go func() { + _, _ = p.Do(ctx) + }() + + time.Sleep(time.Millisecond * 100) + + err = p.AddTask(func() (TaskResult, error) { + return TaskResult{ + Fishes: []string{ + "Salmon", + "Tuna", + }, + }, nil + }) + require.ErrorContains(t, err, "cannot add tasks while processing") + }) + + t.Run("should return an error if no tasks to do", func(t *testing.T) { + ctx := context.TODO() + + p := polipo.NewPolipo[TaskResult]() + + allResults, err := p.Do(ctx) + require.ErrorContains(t, err, "no tasks to do") + require.Empty(t, allResults) + }) } var testCases = []struct {