diff --git a/async_producer_test.go b/async_producer_test.go index 44da15c61..8e395c755 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -19,55 +19,6 @@ import ( "github.com/stretchr/testify/require" ) -const TestMessage = "ABC THE MESSAGE" - -func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) { - var wg sync.WaitGroup - p.AsyncClose() - - closer := make(chan struct{}) - timer := time.AfterFunc(timeout, func() { - t.Error("timeout") - close(closer) - }) - defer timer.Stop() - - wg.Add(2) - go func() { - defer wg.Done() - for { - select { - case <-closer: - return - case _, ok := <-p.Successes(): - if !ok { - return - } - t.Error("Unexpected message on Successes()") - } - } - }() - go func() { - defer wg.Done() - for { - select { - case <-closer: - return - case msg, ok := <-p.Errors(): - if !ok { - return - } - t.Error(msg.Err) - } - } - }() - wg.Wait() -} - -func closeProducer(t *testing.T, p AsyncProducer) { - closeProducerWithTimeout(t, p, 5*time.Minute) -} - func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successCount, errorCount int, timeout time.Duration) { t.Helper() expect := successCount + errorCount @@ -1614,27 +1565,6 @@ func TestBrokerProducerShutdown(t *testing.T) { mockBroker.Close() } -type appendInterceptor struct { - i int -} - -func (b *appendInterceptor) OnSend(msg *ProducerMessage) { - if b.i < 0 { - panic("hey, the interceptor has failed") - } - v, _ := msg.Value.Encode() - msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i)) - b.i++ -} - -func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) { - if b.i < 0 { - panic("hey, the interceptor has failed") - } - msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i)) - b.i++ -} - func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/balance_strategy.go b/balance_strategy.go index 30d41779c..b5bc30a13 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -989,6 +989,7 @@ func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicParti return reversePairPartition } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) { if src == dst { return currentPath, false @@ -1023,6 +1024,7 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur return currentPath, false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { superCycle := make([]string, len(cycle)-1) for i := 0; i < len(cycle)-1; i++ { @@ -1037,6 +1039,7 @@ func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { cycles := make([][]string, 0) for _, pair := range pairs { @@ -1068,6 +1071,7 @@ func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isSticky() bool { for topic, movements := range p.PartitionMovementsByTopic { movementPairs := make([]consumerPair, len(movements)) @@ -1085,6 +1089,7 @@ func (p *partitionMovements) isSticky() bool { return true } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func indexOfSubList(source []string, target []string) int { targetSize := len(target) maxCandidate := len(source) - targetSize diff --git a/client_test.go b/client_test.go index 6280f334f..117a25e40 100644 --- a/client_test.go +++ b/client_test.go @@ -14,14 +14,6 @@ import ( "github.com/rcrowley/go-metrics" ) -func safeClose(t testing.TB, c io.Closer) { - t.Helper() - err := c.Close() - if err != nil { - t.Error(err) - } -} - func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) diff --git a/encoder_decoder_fuzz_test.go b/encoder_decoder_fuzz_test.go index 968fb1a29..1ccfadf22 100644 --- a/encoder_decoder_fuzz_test.go +++ b/encoder_decoder_fuzz_test.go @@ -1,4 +1,4 @@ -//go:build go1.18 +//go:build go1.18 && !functional package sarama diff --git a/helpers_test.go b/helpers_test.go new file mode 100644 index 000000000..1ccc69dd9 --- /dev/null +++ b/helpers_test.go @@ -0,0 +1,87 @@ +package sarama + +import ( + "io" + "strconv" + "sync" + "testing" + "time" +) + +func safeClose(t testing.TB, c io.Closer) { + t.Helper() + err := c.Close() + if err != nil { + t.Error(err) + } +} + +func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) { + var wg sync.WaitGroup + p.AsyncClose() + + closer := make(chan struct{}) + timer := time.AfterFunc(timeout, func() { + t.Error("timeout") + close(closer) + }) + defer timer.Stop() + + wg.Add(2) + go func() { + defer wg.Done() + for { + select { + case <-closer: + return + case _, ok := <-p.Successes(): + if !ok { + return + } + t.Error("Unexpected message on Successes()") + } + } + }() + go func() { + defer wg.Done() + for { + select { + case <-closer: + return + case msg, ok := <-p.Errors(): + if !ok { + return + } + t.Error(msg.Err) + } + } + }() + wg.Wait() +} + +func closeProducer(t *testing.T, p AsyncProducer) { + closeProducerWithTimeout(t, p, 5*time.Minute) +} + +const TestMessage = "ABC THE MESSAGE" + +type appendInterceptor struct { + i int +} + +func (b *appendInterceptor) OnSend(msg *ProducerMessage) { + if b.i < 0 { + panic("hey, the interceptor has failed") + } + v, _ := msg.Value.Encode() + msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i)) + b.i++ +} + +func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) { + if b.i < 0 { + panic("hey, the interceptor has failed") + } + msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i)) + b.i++ +} diff --git a/metrics_helpers_test.go b/metrics_helpers_test.go new file mode 100644 index 000000000..14c5719e4 --- /dev/null +++ b/metrics_helpers_test.go @@ -0,0 +1,169 @@ +package sarama + +import ( + "testing" + + "github.com/rcrowley/go-metrics" +) + +// Common type and functions for metric validation +type metricValidator struct { + name string + validator func(*testing.T, interface{}) +} + +type metricValidators []*metricValidator + +func newMetricValidators() metricValidators { + return make([]*metricValidator, 0, 32) +} + +func (m *metricValidators) register(validator *metricValidator) { + *m = append(*m, validator) +} + +func (m *metricValidators) registerForBroker(broker *Broker, validator *metricValidator) { + m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator}) +} + +func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) { + m.register(&metricValidator{validator.name, validator.validator}) + m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator}) +} + +func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) { + m.register(validator) + m.registerForBroker(broker, validator) +} + +func (m metricValidators) run(t *testing.T, r metrics.Registry) { + t.Helper() + for _, metricValidator := range m { + metric := r.Get(metricValidator.name) + if metric == nil { + t.Error("No metric named", metricValidator.name) + } else { + metricValidator.validator(t, metric) + } + } +} + +func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + t.Helper() + if meter, ok := metric.(metrics.Meter); !ok { + t.Errorf("Expected meter metric for '%s', got %T", name, metric) + } else { + extraValidator(t, meter) + } + }, + } +} + +func countMeterValidator(name string, expectedCount int) *metricValidator { + return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() + count := meter.Count() + if count != int64(expectedCount) { + t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count) + } + }) +} + +func minCountMeterValidator(name string, minCount int) *metricValidator { + return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() + count := meter.Count() + if count < int64(minCount) { + t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count) + } + }) +} + +func histogramValidator(name string, extraValidator func(*testing.T, metrics.Histogram)) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + t.Helper() + if histogram, ok := metric.(metrics.Histogram); !ok { + t.Errorf("Expected histogram metric for '%s', got %T", name, metric) + } else { + extraValidator(t, histogram) + } + }, + } +} + +func countHistogramValidator(name string, expectedCount int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() + count := histogram.Count() + if count != int64(expectedCount) { + t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count) + } + }) +} + +func minCountHistogramValidator(name string, minCount int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() + count := histogram.Count() + if count < int64(minCount) { + t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count) + } + }) +} + +//nolint:unused // this is used but only in unittests which are excluded by the integration build tag +func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() + min := int(histogram.Min()) + if min != expectedMin { + t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min) + } + max := int(histogram.Max()) + if max != expectedMax { + t.Errorf("Expected histogram metric '%s' max = %d, got %d", name, expectedMax, max) + } + }) +} + +func minValHistogramValidator(name string, minMin int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() + min := int(histogram.Min()) + if min < minMin { + t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min) + } + }) +} + +func maxValHistogramValidator(name string, maxMax int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() + max := int(histogram.Max()) + if max > maxMax { + t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max) + } + }) +} + +func counterValidator(name string, expectedCount int) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + t.Helper() + if counter, ok := metric.(metrics.Counter); !ok { + t.Errorf("Expected counter metric for '%s', got %T", name, metric) + } else { + count := counter.Count() + if count != int64(expectedCount) { + t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count) + } + } + }, + } +} diff --git a/metrics_test.go b/metrics_test.go index 954032979..27edcc7bd 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -38,164 +38,3 @@ func TestGetMetricNameForBroker(t *testing.T) { t.Error("Unexpected metric name", metricName) } } - -// Common type and functions for metric validation -type metricValidator struct { - name string - validator func(*testing.T, interface{}) -} - -type metricValidators []*metricValidator - -func newMetricValidators() metricValidators { - return make([]*metricValidator, 0, 32) -} - -func (m *metricValidators) register(validator *metricValidator) { - *m = append(*m, validator) -} - -func (m *metricValidators) registerForBroker(broker *Broker, validator *metricValidator) { - m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator}) -} - -func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) { - m.register(&metricValidator{validator.name, validator.validator}) - m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator}) -} - -func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) { - m.register(validator) - m.registerForBroker(broker, validator) -} - -func (m metricValidators) run(t *testing.T, r metrics.Registry) { - t.Helper() - for _, metricValidator := range m { - metric := r.Get(metricValidator.name) - if metric == nil { - t.Error("No metric named", metricValidator.name) - } else { - metricValidator.validator(t, metric) - } - } -} - -func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) *metricValidator { - return &metricValidator{ - name: name, - validator: func(t *testing.T, metric interface{}) { - t.Helper() - if meter, ok := metric.(metrics.Meter); !ok { - t.Errorf("Expected meter metric for '%s', got %T", name, metric) - } else { - extraValidator(t, meter) - } - }, - } -} - -func countMeterValidator(name string, expectedCount int) *metricValidator { - return meterValidator(name, func(t *testing.T, meter metrics.Meter) { - t.Helper() - count := meter.Count() - if count != int64(expectedCount) { - t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count) - } - }) -} - -func minCountMeterValidator(name string, minCount int) *metricValidator { - return meterValidator(name, func(t *testing.T, meter metrics.Meter) { - t.Helper() - count := meter.Count() - if count < int64(minCount) { - t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count) - } - }) -} - -func histogramValidator(name string, extraValidator func(*testing.T, metrics.Histogram)) *metricValidator { - return &metricValidator{ - name: name, - validator: func(t *testing.T, metric interface{}) { - t.Helper() - if histogram, ok := metric.(metrics.Histogram); !ok { - t.Errorf("Expected histogram metric for '%s', got %T", name, metric) - } else { - extraValidator(t, histogram) - } - }, - } -} - -func countHistogramValidator(name string, expectedCount int) *metricValidator { - return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { - t.Helper() - count := histogram.Count() - if count != int64(expectedCount) { - t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count) - } - }) -} - -func minCountHistogramValidator(name string, minCount int) *metricValidator { - return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { - t.Helper() - count := histogram.Count() - if count < int64(minCount) { - t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count) - } - }) -} - -func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator { - return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { - t.Helper() - min := int(histogram.Min()) - if min != expectedMin { - t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min) - } - max := int(histogram.Max()) - if max != expectedMax { - t.Errorf("Expected histogram metric '%s' max = %d, got %d", name, expectedMax, max) - } - }) -} - -func minValHistogramValidator(name string, minMin int) *metricValidator { - return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { - t.Helper() - min := int(histogram.Min()) - if min < minMin { - t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min) - } - }) -} - -func maxValHistogramValidator(name string, maxMax int) *metricValidator { - return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { - t.Helper() - max := int(histogram.Max()) - if max > maxMax { - t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max) - } - }) -} - -func counterValidator(name string, expectedCount int) *metricValidator { - return &metricValidator{ - name: name, - validator: func(t *testing.T, metric interface{}) { - t.Helper() - if counter, ok := metric.(metrics.Counter); !ok { - t.Errorf("Expected counter metric for '%s', got %T", name, metric) - } else { - count := counter.Count() - if count != int64(expectedCount) { - t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count) - } - } - }, - } -}