Skip to content

Commit

Permalink
Kafka: Add support to configure heartbeat interval and session timeou…
Browse files Browse the repository at this point in the history
…t to kafka's consumer (dapr#3375)

Signed-off-by: denisbchrsk <[email protected]>
  • Loading branch information
denisbchrsk authored Mar 21, 2024
1 parent 85252be commit b9c12df
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 0 deletions.
12 changes: 12 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ metadata:
Disables consumer retry by setting this to "false".
example: '"true"'
default: '"false"'
- name: heartbeatInterval
type: duration
description: |
The interval between heartbeats to the consumer coordinator.
example: '"5s"'
default: '"3s"'
- name: sessionTimeout
type: duration
description: |
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
example: '"20s"'
default: '"10s"'
- name: version
type: string
description: |
Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Offsets.Initial = k.initialOffset
config.Consumer.Fetch.Min = meta.consumerFetchMin
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.Consumer.Group.Heartbeat.Interval = meta.HeartbeatInterval
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
config.ChannelBufferSize = meta.channelBufferSize

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
Expand Down
4 changes: 4 additions & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type KafkaMetadata struct {
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`
Expand Down Expand Up @@ -158,6 +160,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
consumerFetchDefault: 1024 * 1024,
ClientConnectionTopicMetadataRefreshInterval: defaultClientConnectionTopicMetadataRefreshInterval,
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
HeartbeatInterval: 3 * time.Second,
SessionTimeout: 10 * time.Second,
}

err := metadata.DecodeMetadata(meta, &m)
Expand Down
62 changes: 62 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func getCompleteMetadata() map[string]string {
consumerFetchDefault: "1048576",
consumerFetchMin: "1",
channelBufferSize: "256",
"heartbeatInterval": "2s",
"sessionTimeout": "30s",
}
}

Expand Down Expand Up @@ -132,6 +134,8 @@ func assertMetadata(t *testing.T, meta *KafkaMetadata) {
require.Equal(t, int32(1024*1024), meta.consumerFetchDefault)
require.Equal(t, int32(1), meta.consumerFetchMin)
require.Equal(t, 256, meta.channelBufferSize)
require.Equal(t, 2*time.Second, meta.HeartbeatInterval)
require.Equal(t, 30*time.Second, meta.SessionTimeout)
require.Equal(t, 8*time.Minute, defaultClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 0*time.Minute, defaultClientConnectionKeepAliveInterval)
}
Expand Down Expand Up @@ -443,6 +447,64 @@ func TestMetadataChannelBufferSize(t *testing.T) {
require.Equal(t, 128, meta.channelBufferSize)
}

func TestMetadataHeartbeartInterval(t *testing.T) {
k := getKafka()

t.Run("default heartbeat interval", func(t *testing.T) {
// arrange
m := getBaseMetadata()

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 3*time.Second, meta.HeartbeatInterval)
})

t.Run("with heartbeat interval set", func(t *testing.T) {
// arrange
m := getBaseMetadata()
m["heartbeatInterval"] = "1s"

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 1*time.Second, meta.HeartbeatInterval)
})
}

func TestMetadataSessionTimeout(t *testing.T) {
k := getKafka()

t.Run("default session timeout", func(t *testing.T) {
// arrange
m := getBaseMetadata()

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 10*time.Second, meta.SessionTimeout)
})

t.Run("with session timeout set", func(t *testing.T) {
// arrange
m := getBaseMetadata()
m["sessionTimeout"] = "20s"

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 20*time.Second, meta.SessionTimeout)
})
}

func TestGetEventMetadata(t *testing.T) {
ts := time.Now()

Expand Down
12 changes: 12 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ metadata:
Disables consumer retry by setting this to "false".
example: '"true"'
default: '"false"'
- name: heartbeatInterval
type: duration
description: |
The interval between heartbeats to the consumer coordinator.
example: '"5s"'
default: '"3s"'
- name: sessionTimeout
type: duration
description: |
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
example: '"20s"'
default: '"10s"'
- name: version
type: string
description: |
Expand Down

0 comments on commit b9c12df

Please sign in to comment.