Skip to content

Commit

Permalink
Add Kafka idempotent producer config (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmsilva-wls authored Dec 6, 2022
1 parent 5fdfd5f commit 30c1638
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 0 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (a *App) loadConfigurationDefaults() {
a.config.SetDefault("kafka.producer.net.dialTimeout", "500ms")
a.config.SetDefault("kafka.producer.net.readTimeout", "250ms")
a.config.SetDefault("kafka.producer.net.writeTimeout", "250ms")
a.config.SetDefault("kafka.producer.idempotent", false)
a.config.SetDefault("kafka.producer.net.keepAlive", "60s")
a.config.SetDefault("kafka.producer.brokers", "localhost:9192")
a.config.SetDefault("kafka.producer.maxMessageBytes", 1000000)
Expand Down
1 change: 1 addition & 0 deletions forwarder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewKafkaForwarder(config *viper.Viper) (*KafkaForwarder, error) {
kafkaConf.Net.KeepAlive = config.GetDuration("kafka.producer.net.keepAlive")
kafkaConf.Producer.Return.Errors = true
kafkaConf.Producer.Return.Successes = true
kafkaConf.Producer.Idempotent = config.GetBool("kafka.producer.idempotent")
kafkaConf.Producer.MaxMessageBytes = config.GetInt("kafka.producer.maxMessageBytes")
kafkaConf.Producer.Timeout = config.GetDuration("kafka.producer.timeout")
kafkaConf.Producer.Flush.Bytes = config.GetInt("kafka.producer.batch.size")
Expand Down

0 comments on commit 30c1638

Please sign in to comment.