Skip to content

Commit

Permalink
feat(kafka): add producer config capabilities for connections (dapr#3371
Browse files Browse the repository at this point in the history
)

Signed-off-by: Samantha Coyle <[email protected]>
Signed-off-by: Alessandro (Ale) Segala <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
  • Loading branch information
sicoyle and ItalyPaleAle authored Mar 21, 2024
1 parent 2502256 commit 85252be
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .build-tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
)

require (
github.com/dapr/kit v0.13.0 // indirect
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions .build-tools/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/dapr/kit v0.13.0 h1:4S+5QqDCreva+MBONtIgxeg6B2b1W89bB8F5lqKgTa0=
github.com/dapr/kit v0.13.0/go.mod h1:VyHrelNXPbtS/VcQX0Y/uzW0lfEVuveJ+1E5bDys8mo=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
12 changes: 12 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ metadata:
example: '"group1"'
binding:
input: true
- name: clientConnectionTopicMetadataRefreshInterval
type: duration
description: |
The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.
example: '4m'
default: '9m'
- name: clientConnectionKeepAliveInterval
type: duration
description: |
The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.
example: '4m'
default: '0'
- name: clientID
type: string
description: |
Expand Down
3 changes: 3 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.ChannelBufferSize = meta.channelBufferSize

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
config.Metadata.RefreshFrequency = meta.ClientConnectionTopicMetadataRefreshInterval

if meta.ClientID != "" {
config.ClientID = meta.ClientID
}
Expand Down
87 changes: 56 additions & 31 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ const (
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"

// Kafka client config default values.
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
// This prevents write: broken pipe err when writer does not know connection was closed,
// and continues to publish to closed connection.
clientConnectionTopicMetadataRefreshInterval = "clientConnectionTopicMetadataRefreshInterval"
defaultClientConnectionTopicMetadataRefreshInterval = 8 * time.Minute // needs to be 8 as kafka default for killing idle connections is 9 min
clientConnectionKeepAliveInterval = "clientConnectionKeepAliveInterval"
defaultClientConnectionKeepAliveInterval = time.Duration(0) // default to keep connection alive
)

type KafkaMetadata struct {
Expand Down Expand Up @@ -80,16 +89,23 @@ type KafkaMetadata struct {
Version string `mapstructure:"version"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`

// configs for kafka client
ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
ClientConnectionKeepAliveInterval time.Duration `mapstructure:"clientConnectionKeepAliveInterval"`

// aws iam auth profile
AWSAccessKey string `mapstructure:"awsAccessKey"`
AWSSecretKey string `mapstructure:"awsSecretKey"`
AWSSessionToken string `mapstructure:"awsSessionToken"`
AWSIamRoleArn string `mapstructure:"awsIamRoleArn"`
AWSStsSessionName string `mapstructure:"awsStsSessionName"`
AWSRegion string `mapstructure:"awsRegion"`
channelBufferSize int `mapstructure:"-"`
consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`
AWSAccessKey string `mapstructure:"awsAccessKey"`
AWSSecretKey string `mapstructure:"awsSecretKey"`
AWSSessionToken string `mapstructure:"awsSessionToken"`
AWSIamRoleArn string `mapstructure:"awsIamRoleArn"`
AWSStsSessionName string `mapstructure:"awsStsSessionName"`
AWSRegion string `mapstructure:"awsRegion"`
channelBufferSize int `mapstructure:"-"`

consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`

// schema registry
SchemaRegistryURL string `mapstructure:"schemaRegistryURL"`
SchemaRegistryAPIKey string `mapstructure:"schemaRegistryAPIKey"`
Expand All @@ -99,49 +115,49 @@ type KafkaMetadata struct {
}

// upgradeMetadata updates metadata properties based on deprecated usage.
func (k *Kafka) upgradeMetadata(metadata map[string]string) (map[string]string, error) {
authTypeVal, authTypePres := metadata[authType]
authReqVal, authReqPres := metadata["authRequired"]
saslPassVal, saslPassPres := metadata["saslPassword"]
func (k *Kafka) upgradeMetadata(meta map[string]string) (map[string]string, error) {
authTypeKey, authTypeVal, authTypeOk := metadata.GetMetadataPropertyWithMatchedKey(meta, authType)
if authTypeKey == "" {
authTypeKey = "authType"
}
authReqVal, authReqOk := metadata.GetMetadataProperty(meta, "authRequired")
saslPassVal, saslPassOk := metadata.GetMetadataProperty(meta, "saslPassword")

// If authType is not set, derive it from authRequired.
if (!authTypePres || authTypeVal == "") && authReqPres && authReqVal != "" {
if (!authTypeOk || authTypeVal == "") && authReqOk && authReqVal != "" {
k.logger.Warn("AuthRequired is deprecated, use AuthType instead.")
validAuthRequired, err := strconv.ParseBool(authReqVal)
if err == nil {
if validAuthRequired {
// If legacy authRequired was used, either SASL username or mtls is the method.
if saslPassPres && saslPassVal != "" {
if saslPassOk && saslPassVal != "" {
// User has specified saslPassword, so intend for password auth.
metadata[authType] = passwordAuthType
meta[authTypeKey] = passwordAuthType
} else {
metadata[authType] = mtlsAuthType
meta[authTypeKey] = mtlsAuthType
}
} else {
metadata[authType] = noAuthType
meta[authTypeKey] = noAuthType
}
} else {
return metadata, errors.New("kafka error: invalid value for 'authRequired' attribute")
return meta, errors.New("kafka error: invalid value for 'authRequired' attribute")
}
}

// if consumeRetryEnabled is not present, use component default value
consumeRetryEnabledVal, consumeRetryEnabledPres := metadata[consumeRetryEnabled]
if !consumeRetryEnabledPres || consumeRetryEnabledVal == "" {
metadata[consumeRetryEnabled] = strconv.FormatBool(k.DefaultConsumeRetryEnabled)
}

return metadata, nil
return meta, nil
}

// getKafkaMetadata returns new Kafka metadata.
func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) {
m := KafkaMetadata{
ConsumeRetryInterval: 100 * time.Millisecond,
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
channelBufferSize: 256,
consumerFetchMin: 1,
consumerFetchDefault: 1024 * 1024,
ConsumeRetryEnabled: k.DefaultConsumeRetryEnabled,
ConsumeRetryInterval: 100 * time.Millisecond,
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
channelBufferSize: 256,
consumerFetchMin: 1,
consumerFetchDefault: 1024 * 1024,
ClientConnectionTopicMetadataRefreshInterval: defaultClientConnectionTopicMetadataRefreshInterval,
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
}

err := metadata.DecodeMetadata(meta, &m)
Expand Down Expand Up @@ -307,5 +323,14 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
m.consumerFetchMin = int32(v)
}

// confirm client connection fields are valid
if m.ClientConnectionTopicMetadataRefreshInterval <= 0 {
m.ClientConnectionTopicMetadataRefreshInterval = defaultClientConnectionTopicMetadataRefreshInterval
}

if m.ClientConnectionKeepAliveInterval < 0 {
m.ClientConnectionKeepAliveInterval = defaultClientConnectionKeepAliveInterval
}

return &m, nil
}
38 changes: 38 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func assertMetadata(t *testing.T, meta *KafkaMetadata) {
require.Equal(t, int32(1024*1024), meta.consumerFetchDefault)
require.Equal(t, int32(1), meta.consumerFetchMin)
require.Equal(t, 256, meta.channelBufferSize)
require.Equal(t, 8*time.Minute, defaultClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 0*time.Minute, defaultClientConnectionKeepAliveInterval)
}

func TestMissingBrokers(t *testing.T) {
Expand Down Expand Up @@ -395,6 +397,42 @@ func TestMetadataConsumerFetchValues(t *testing.T) {
require.Equal(t, int32(2048), meta.consumerFetchDefault)
}

func TestMetadataProducerValues(t *testing.T) {
t.Run("using default producer values", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()

meta, err := k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
})

t.Run("setting producer values explicitly", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[clientConnectionTopicMetadataRefreshInterval] = "3m0s"
m[clientConnectionKeepAliveInterval] = "4m0s"

meta, err := k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, 3*time.Minute, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 4*time.Minute, meta.ClientConnectionKeepAliveInterval)
})

t.Run("setting producer invalid values so defaults take over", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[clientConnectionTopicMetadataRefreshInterval] = "-1h40m0s"
m[clientConnectionKeepAliveInterval] = "-1h40m0s"

meta, err := k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
})
}

func TestMetadataChannelBufferSize(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
Expand Down
15 changes: 10 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/cloudwego/kitex-examples v0.1.1
github.com/cyphar/filepath-securejoin v0.2.4
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.13.0
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548
github.com/didip/tollbooth/v7 v7.0.1
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
Expand All @@ -82,7 +82,7 @@ require (
github.com/kubemq-io/kubemq-go v1.7.9
github.com/labd/commercetools-go-sdk v1.3.1
github.com/lestrrat-go/httprc v1.0.4
github.com/lestrrat-go/jwx/v2 v2.0.19
github.com/lestrrat-go/jwx/v2 v2.0.20
github.com/machinebox/graphql v0.2.2
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/microsoft/go-mssqldb v1.6.0
Expand Down Expand Up @@ -114,7 +114,7 @@ require (
go.mongodb.org/mongo-driver v1.12.1
go.uber.org/multierr v1.11.0
go.uber.org/ratelimit v0.3.0
golang.org/x/crypto v0.18.0
golang.org/x/crypto v0.19.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/mod v0.14.0
golang.org/x/net v0.20.0
Expand Down Expand Up @@ -381,8 +381,8 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.17.0 // indirect
Expand Down Expand Up @@ -431,3 +431,8 @@ replace github.com/microcosm-cc/bluemonday => github.com/microcosm-cc/bluemonday

// this is a fork which addresses a performance issues due to go routines.
replace dubbo.apache.org/dubbo-go/v3 => dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20230118042253-4f159a2b38f3

// Uncomment for local development for testing with changes in the components-contrib && kit repositories.
// Don't commit with this uncommented!
//
// replace github.com/dapr/kit => ../kit
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/kit v0.13.0 h1:4S+5QqDCreva+MBONtIgxeg6B2b1W89bB8F5lqKgTa0=
github.com/dapr/kit v0.13.0/go.mod h1:VyHrelNXPbtS/VcQX0Y/uzW0lfEVuveJ+1E5bDys8mo=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -1082,8 +1082,8 @@ github.com/lestrrat-go/iter v1.0.1/go.mod h1:zIdgO1mRKhn8l9vrZJZz9TUMMFbQbLeTsbq
github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI=
github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4=
github.com/lestrrat-go/jwx v1.2.24/go.mod h1:zoNuZymNl5lgdcu6P7K6ie2QRll5HVfF4xwxBBK1NxY=
github.com/lestrrat-go/jwx/v2 v2.0.19 h1:ekv1qEZE6BVct89QA+pRF6+4pCpfVrOnEJnTnT4RXoY=
github.com/lestrrat-go/jwx/v2 v2.0.19/go.mod h1:l3im3coce1lL2cDeAjqmaR+Awx+X8Ih+2k8BuHNJ4CU=
github.com/lestrrat-go/jwx/v2 v2.0.20 h1:sAgXuWS/t8ykxS9Bi2Qtn5Qhpakw1wrcjxChudjolCc=
github.com/lestrrat-go/jwx/v2 v2.0.20/go.mod h1:UlCSmKqw+agm5BsOBfEAbTvKsEApaGNqHAEUTv5PJC4=
github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
Expand Down Expand Up @@ -1753,8 +1753,8 @@ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -2029,8 +2029,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -2041,8 +2041,8 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
12 changes: 12 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,18 @@ metadata:
The minimum number of message bytes to fetch in a request.
example: '4'
default: '1'
- name: clientConnectionTopicMetadataRefreshInterval
type: duration
description: |
The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.
example: '4m'
default: '9m'
- name: clientConnectionKeepAliveInterval
type: duration
description: |
The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.
example: '4m'
default: '0'
- name: consumerFetchDefault
type: number
description: |
Expand Down
10 changes: 5 additions & 5 deletions tests/certification/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ require (
github.com/dapr/components-contrib v1.13.0-rc.6
github.com/dapr/dapr v1.13.0-rc.7
github.com/dapr/go-sdk v1.6.1-0.20231102031149-87bbb8cd690a
github.com/dapr/kit v0.13.0
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/go-chi/chi/v5 v5.0.11
github.com/go-redis/redis/v8 v8.11.5
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.5.2
github.com/joho/godotenv v1.4.0
github.com/lestrrat-go/jwx/v2 v2.0.19
github.com/lestrrat-go/jwx/v2 v2.0.20
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/rabbitmq/amqp091-go v1.8.1
github.com/riferrei/srclient v0.6.0
Expand Down Expand Up @@ -303,14 +303,14 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.17.0 // indirect
Expand Down
Loading

0 comments on commit 85252be

Please sign in to comment.