Skip to content

Commit

Permalink
Added the ErrorGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
pieterclaerhout committed Sep 30, 2019
1 parent 21a5523 commit e757679
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 3 deletions.
73 changes: 73 additions & 0 deletions errgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package waitgroup

import (
"context"
"sync"
)

// An ErrorGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
type ErrorGroup struct {
size int
pool chan byte

cancel func()

wg sync.WaitGroup

errOnce sync.Once
err error
}

// NewErrorGroup returns a new ErrorGroup instance
func NewErrorGroup(ctx context.Context, size int) (*ErrorGroup, context.Context) {
ctx, cancel := context.WithCancel(ctx)
wg := &ErrorGroup{
size: size,
cancel: cancel,
}
if size > 0 {
wg.pool = make(chan byte, size)
}
return wg, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *ErrorGroup) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}

// Add calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *ErrorGroup) Add(f func() error) {

if g.size > 0 {
g.pool <- 1
}
g.wg.Add(1)

go func() {
defer func() {
if g.size > 0 {
<-g.pool
}
g.wg.Done()
}()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
46 changes: 46 additions & 0 deletions errgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package waitgroup_test

import (
"context"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/pieterclaerhout/go-waitgroup"
)

func Test_ErrorGroup_Add(t *testing.T) {

type test struct {
name string
size int
}

var tests = []test{
{"single", 1},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

ctx := context.Background()

wg, _ := waitgroup.NewErrorGroup(ctx, tc.size)
assert.NotNil(t, wg)

wg.Add(func() error {
return nil
})

wg.Add(func() error {
return errors.New("An error occurred")
})

err := wg.Wait()
assert.Error(t, err)

})
}

}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ module github.com/pieterclaerhout/go-waitgroup

go 1.12

require github.com/stretchr/testify v1.4.0
require (
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
1 change: 0 additions & 1 deletion waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package waitgroup

import (
"sync"
// "sync/atomic"
)

// WaitGroup implements a simple goruntine pool.
Expand Down
29 changes: 28 additions & 1 deletion waitgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pieterclaerhout/go-waitgroup"
)

func Test_Waitgroup(t *testing.T) {
func Test_WaitGroup(t *testing.T) {

type test struct {
name string
Expand All @@ -24,6 +24,7 @@ func Test_Waitgroup(t *testing.T) {

wg := waitgroup.NewWaitGroup(tc.size)
assert.NotNil(t, wg)

assert.Zero(t, wg.PendingCount(), "pending-before")

wg.BlockAdd()
Expand All @@ -41,3 +42,29 @@ func Test_Waitgroup(t *testing.T) {
}

}

func Test_WaitGroup_Add(t *testing.T) {

type test struct {
name string
size int
}

var tests = []test{
{"single", 1},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

wg := waitgroup.NewWaitGroup(tc.size)
assert.NotNil(t, wg)

wg.Add(func() {})

wg.Wait()

})
}

}

0 comments on commit e757679

Please sign in to comment.