Skip to content

Commit

Permalink
Fix default retention time value in offset commit (#2700)
Browse files Browse the repository at this point in the history
The retention time field of the offset commit request uses -1 to mean
"use the broker's default". Sarama uses the value 0 in the
`Config.Consumer.Offsets.Retenton` field to mean "use the broker's
default". Ensure that Sarama's default (0) is correctly mapped to the
broker's default (-1).

Fixes: #2677

Signed-off-by: Adrian Preston <[email protected]>
  • Loading branch information
prestona authored Nov 2, 2023
1 parent f97ced2 commit 2e077cf
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
10 changes: 7 additions & 3 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,13 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {

// request controlled retention was only supported from V2-V4 (it became
// broker-only after that) so if the user has set the config options then
// flow those through as retention time on the commit request
if r.Version >= 2 && r.Version < 5 && om.conf.Consumer.Offsets.Retention > 0 {
r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond)
// flow those through as retention time on the commit request.
if r.Version >= 2 && r.Version < 5 {
// Map Sarama's default of 0 to Kafka's default of -1
r.RetentionTime = -1
if om.conf.Consumer.Offsets.Retention > 0 {
r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond)
}
}

om.pomsLock.RLock()
Expand Down
61 changes: 61 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"errors"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -572,3 +573,63 @@ func TestAbortPartitionOffsetManager(t *testing.T) {
safeClose(t, om)
safeClose(t, testClient)
}

// Validate that the constructRequest() method correctly maps Sarama's default for
// Config.Consumer.Offsets.Retention to the equivalent Kafka value.
func TestConstructRequestRetentionTime(t *testing.T) {
expectedRetention := func(version KafkaVersion, retention time.Duration) int64 {
switch {
case version.IsAtLeast(V2_1_0_0):
// version >= 2.1.0: Client specified retention time isn't supported in the
// offset commit request anymore, thus the retention time field set in the
// OffsetCommitRequest struct should be 0.
return 0
case version.IsAtLeast(V0_9_0_0):
// 0.9.0 <= version < 2.1.0: Retention time *is* supported in the offset commit
// request. Sarama's default retention times (0) must be mapped to the Kafka
// default (-1). Non-zero Sarama times are converted from time.Duration to
// an int64 millisecond value.
if retention > 0 {
return int64(retention / time.Millisecond)
} else {
return -1
}
default:
// version < 0.9.0: Client specified retention time is not supported in the offset
// commit request, thus the retention time field set in the OffsetCommitRequest
// struct should be 0.
return 0
}
}

for _, version := range SupportedVersions {
for _, retention := range []time.Duration{0, time.Millisecond} {
name := fmt.Sprintf("version %s retention: %s", version, retention)
t.Run(name, func(t *testing.T) {
// Perform necessary setup for calling the constructRequest() method. This
// test-case only cares about the code path that sets the retention time
// field in the returned request struct.
conf := NewTestConfig()
conf.Version = version
conf.Consumer.Offsets.Retention = retention
om := &offsetManager{
conf: conf,
poms: map[string]map[int32]*partitionOffsetManager{
"topic": {
0: {
dirty: true,
},
},
},
}

req := om.constructRequest()

expectedRetention := expectedRetention(version, retention)
if req.RetentionTime != expectedRetention {
t.Errorf("expected retention time %d, got: %d", expectedRetention, req.RetentionTime)
}
})
}
}
}

0 comments on commit 2e077cf

Please sign in to comment.