Skip to content

Commit

Permalink
fix(test): ensure versionRange is at least 2.1.0 for 4.0 kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Dec 24, 2024
1 parent 016c7e3 commit fd745c0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
7 changes: 7 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
upper = MaxVersion
}

// KIP-896 dictates a minimum lower bound of 2.1 protocol for Kafka 4.0 onwards
if upper.IsAtLeast(V4_0_0_0) {
if !lower.IsAtLeast(V2_1_0_0) {
lower = V2_1_0_0
}
}

versions := make([]KafkaVersion, 0, len(fvtRangeVersions))
for _, v := range fvtRangeVersions {
if !v.IsAtLeast(lower) {
Expand Down
14 changes: 10 additions & 4 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,15 +812,21 @@ func testProducingMessages(t *testing.T, config *Config, minVersion KafkaVersion
config.Consumer.Return.Errors = true

kafkaVersions := map[KafkaVersion]bool{}
if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil {
kafkaVersions[upper] = true
// KIP-896 dictates a minimum lower bound of 2.1 protocol for Kafka 4.0 onwards
if upper.IsAtLeast(V4_0_0_0) {
if !minVersion.IsAtLeast(V2_1_0_0) {
minVersion = V2_1_0_0
}
}
}

for _, v := range []KafkaVersion{MinVersion, V0_10_0_0, V0_11_0_0, V1_0_0_0, V2_0_0_0, V2_1_0_0} {
if v.IsAtLeast(minVersion) {
kafkaVersions[v] = true
}
}
if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil {
kafkaVersions[upper] = true
}

for version := range kafkaVersions {
name := t.Name() + "-v" + version.String()
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit fd745c0

Please sign in to comment.