Skip to content

Commit

Permalink
fix(test): refactor helpers into common files
Browse files Browse the repository at this point in the history
Move unittest and functional test shared code into helper files

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Dec 27, 2024
1 parent 54ff3b9 commit 45e7b09
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 240 deletions.
70 changes: 0 additions & 70 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,6 @@ import (
"github.com/stretchr/testify/require"
)

const TestMessage = "ABC THE MESSAGE"

func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) {
var wg sync.WaitGroup
p.AsyncClose()

closer := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
t.Error("timeout")
close(closer)
})
defer timer.Stop()

wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case <-closer:
return
case _, ok := <-p.Successes():
if !ok {
return
}
t.Error("Unexpected message on Successes()")
}
}
}()
go func() {
defer wg.Done()
for {
select {
case <-closer:
return
case msg, ok := <-p.Errors():
if !ok {
return
}
t.Error(msg.Err)
}
}
}()
wg.Wait()
}

func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, 5*time.Minute)
}

func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successCount, errorCount int, timeout time.Duration) {
t.Helper()
expect := successCount + errorCount
Expand Down Expand Up @@ -1614,27 +1565,6 @@ func TestBrokerProducerShutdown(t *testing.T) {
mockBroker.Close()
}

type appendInterceptor struct {
i int
}

func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
v, _ := msg.Value.Encode()
msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
b.i++
}

func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
b.i++
}

func testProducerInterceptor(
t *testing.T,
interceptors []ProducerInterceptor,
Expand Down
5 changes: 5 additions & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicParti
return reversePairPartition
}

//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag)
func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
if src == dst {
return currentPath, false
Expand Down Expand Up @@ -1023,6 +1024,7 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur
return currentPath, false
}

//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag)
func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
superCycle := make([]string, len(cycle)-1)
for i := 0; i < len(cycle)-1; i++ {
Expand All @@ -1037,6 +1039,7 @@ func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
return false
}

//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag)
func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
cycles := make([][]string, 0)
for _, pair := range pairs {
Expand Down Expand Up @@ -1068,6 +1071,7 @@ func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
return false
}

//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag)
func (p *partitionMovements) isSticky() bool {
for topic, movements := range p.PartitionMovementsByTopic {
movementPairs := make([]consumerPair, len(movements))
Expand All @@ -1085,6 +1089,7 @@ func (p *partitionMovements) isSticky() bool {
return true
}

//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag)
func indexOfSubList(source []string, target []string) int {
targetSize := len(target)
maxCandidate := len(source) - targetSize
Expand Down
8 changes: 0 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ import (
"github.com/rcrowley/go-metrics"
)

func safeClose(t testing.TB, c io.Closer) {
t.Helper()
err := c.Close()
if err != nil {
t.Error(err)
}
}

func TestSimpleClient(t *testing.T) {
seedBroker := NewMockBroker(t, 1)

Expand Down
2 changes: 1 addition & 1 deletion encoder_decoder_fuzz_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build go1.18
//go:build go1.18 && !functional

package sarama

Expand Down
87 changes: 87 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package sarama

import (
"io"
"strconv"
"sync"
"testing"
"time"
)

func safeClose(t testing.TB, c io.Closer) {
t.Helper()
err := c.Close()
if err != nil {
t.Error(err)
}
}

func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) {
var wg sync.WaitGroup
p.AsyncClose()

closer := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
t.Error("timeout")
close(closer)
})
defer timer.Stop()

wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case <-closer:
return
case _, ok := <-p.Successes():
if !ok {
return
}
t.Error("Unexpected message on Successes()")
}
}
}()
go func() {
defer wg.Done()
for {
select {
case <-closer:
return
case msg, ok := <-p.Errors():
if !ok {
return
}
t.Error(msg.Err)
}
}
}()
wg.Wait()
}

func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, 5*time.Minute)
}

const TestMessage = "ABC THE MESSAGE"

type appendInterceptor struct {
i int
}

func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
v, _ := msg.Value.Encode()
msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
b.i++
}

func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
b.i++
}
Loading

0 comments on commit 45e7b09

Please sign in to comment.