Skip to content

Commit

Permalink
MINOR: Do not override retries for idempotent producers (apache#8097)
Browse files Browse the repository at this point in the history
The KafkaProducer code would set infinite retries (MAX_INT) if the producer was configured with idempotence and no retries were configured by the user. This is superfluous because KIP-91 changed the retry functionality to both be time-based and the default retries config to be MAX_INT.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
stanislavkozlovski authored Feb 13, 2020
1 parent 46e80db commit ea72ede
Showing 1 changed file with 1 addition and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
apiVersions,
throttleTimeSensor,
logContext);
int retries = configureRetries(producerConfig, log);
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
Expand All @@ -468,7 +467,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
Expand Down Expand Up @@ -527,15 +526,6 @@ private static TransactionManager configureTransactionState(ProducerConfig confi
return transactionManager;
}

private static int configureRetries(ProducerConfig config, Logger log) {
boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
if (config.idempotenceEnabled() && !userConfiguredRetries) {
log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}

private static int configureInflightRequests(ProducerConfig config) {
if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
Expand Down

0 comments on commit ea72ede

Please sign in to comment.