Skip to content

Commit

Permalink
Upgrade Kafka to 3.8.0 (#2180)
Browse files Browse the repository at this point in the history
* Upgrading kafka to 3.8.0 - config properties rewriting and adding necessary dependencies

# Conflicts:
#	gradle.properties

* Upgrading kafka to 3.8.0 - using alternative for removed getAllTopicConfigs zk admin client method

* Upgrading kafka to 3.8.0 - adding 3.8 zk client creation way

* Upgrading kafka to 3.8.0 - adding 3.8 network client creation way

* replication/quota/topic log constants moved in 3.8 again

its value hasn't changed, only where it was stored, this way it's backward compatible

* Update usages of Metadata to conform to kafka 3.7 interface

---------

Co-authored-by: David Simon <[email protected]>
  • Loading branch information
akatona84 and david-simon authored Jan 13, 2025
1 parent 1af2863 commit 97b6d79
Show file tree
Hide file tree
Showing 22 changed files with 174 additions and 164 deletions.
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ project(':cruise-control') {
implementation "io.netty:netty-handler:${nettyVersion}"
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
api "org.apache.kafka:kafka-server:$kafkaVersion"
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
api "org.apache.kafka:kafka-clients:$kafkaVersion"
// Add following dependency when upgrading to Kafka 3.5
api "org.apache.kafka:kafka-storage:$kafkaVersion"
Expand Down Expand Up @@ -446,6 +448,7 @@ project(':cruise-control-metrics-reporter') {
implementation "com.yammer.metrics:metrics-core:2.2.0"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2"
implementation "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
implementation "org.apache.kafka:kafka-server:$kafkaVersion"
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
implementation 'com.google.code.findbugs:jsr305:3.0.2'
// Temporary pin for vulnerability
Expand All @@ -456,7 +459,9 @@ project(':cruise-control-metrics-reporter') {
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
testOutput sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
Expand All @@ -16,6 +15,10 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -79,7 +82,7 @@ public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
"127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
Expand All @@ -91,11 +94,11 @@ public Properties overridingProps() {
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
// disable topic auto-creation to leave the metrics reporter to create the metrics topic
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(KafkaConfig.NumPartitionsProp(), "2");
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.setProperty(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "2");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import java.io.IOException;
import java.util.Properties;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.Assert;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
Expand Down Expand Up @@ -45,14 +48,14 @@ public Properties overridingProps() {
}
}
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "SSL://127.0.0.1:" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name);
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
Expand All @@ -35,6 +34,10 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -86,13 +89,13 @@ public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
return props;
}

Expand Down Expand Up @@ -210,7 +213,8 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
public void testGetKafkaBootstrapServersConfigure() {
// Test with a "listeners" config with a host
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
Map<String, Object> listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), brokerConfig.get(KafkaConfig.ListenersProp()));
Map<String, Object> listenersMap = Collections.singletonMap(
SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)";
Pattern urlParsePattern = Pattern.compile(urlParse);
Expand All @@ -219,15 +223,15 @@ public void testGetKafkaBootstrapServersConfigure() {

// Test with a "listeners" config without a host in the first listener.
String listeners = "SSL://:1234,PLAINTEXT://myhost:4321";
listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), listeners);
listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners);
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]);
assertEquals("1234", bootstrapServers.split(":")[1]);

// Test with "listeners" and "port" config together.
listenersMap = new HashMap<>();
listenersMap.put(KafkaConfig.ListenersProp(), listeners);
listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap.put("port", "43");
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
Expand Down Expand Up @@ -94,11 +97,11 @@ private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws Cla
}

private void parseConfigs(Map<Object, Object> config) {
_id = Integer.parseInt((String) config.get(KafkaConfig.BrokerIdProp()));
_logDir = new File((String) config.get(KafkaConfig.LogDirProp()));
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));

// Bind addresses
String listenersString = (String) config.get(KafkaConfig.ListenersProp());
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
for (String protocolAddr : listenersString.split("\\s*,\\s*")) {
try {
URI uri = new URI(protocolAddr.trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestSslUtils;


Expand Down Expand Up @@ -257,27 +264,27 @@ public Map<Object, Object> buildConfig() {
if (_sslPort >= 0) {
csvJoiner.add(SecurityProtocol.SSL.name + "://localhost:" + _sslPort);
}
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(_nodeId));
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
props.put(KafkaConfig.LogDirProp(), _logDirectory.getAbsolutePath());
props.put(KafkaConfig.ZkConnectProp(), _zkConnect);
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(_enableControlledShutdown));
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(_enableDeleteTopic));
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), Long.toString(_controlledShutdownRetryBackoff));
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), Long.toString(_logCleanerDedupBufferSize));
props.put(KafkaConfig.LogCleanerEnableProp(), Boolean.toString(_enableLogCleaner));
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.put(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(_nodeId));
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
props.put(ZkConfigs.ZK_CONNECT_CONFIG, _zkConnect);
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown));
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, Long.toString(_controlledShutdownRetryBackoff));
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize));
props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner));
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
if (_rack != null) {
props.put(KafkaConfig.RackProp(), _rack);
props.put(ServerConfigs.BROKER_RACK_CONFIG, _rack);
}
if (_trustStore != null || _sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
// Switch interbroker to ssl
props.put(KafkaConfig.InterBrokerSecurityProtocolProp(), SecurityProtocol.SSL.name);
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

import java.io.File;
import java.util.Properties;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -56,7 +56,7 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
throw new AssertionError("ssl set but no trust store provided");
}
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
clientProps.setProperty(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorAnomalyFinder;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import net.minidev.json.JSONArray;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -97,7 +97,7 @@ public Map<Object, Object> overridingProps() {
Map<Object, Object> props = KafkaCruiseControlIntegrationTestUtils.createBrokerProps();
Entry<File, File> logFolders = Map.entry(CCKafkaTestUtils.newTempDir(), CCKafkaTestUtils.newTempDir());
_brokerLogDirs.add(logFolders);
props.put(KafkaConfig.LogDirsProp(), logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore;
import kafka.server.KafkaConfig;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -45,6 +44,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.network.SocketServerConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,7 +168,7 @@ public static Map<Object, Object> createBrokerProps() {
StringJoiner csvJoiner = new StringJoiner(",");
csvJoiner.add(SecurityProtocol.PLAINTEXT.name + "://localhost:"
+ KafkaCruiseControlIntegrationTestUtils.findRandomOpenPortOnAllLocalInterfaces());
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "2");
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
Expand Down
Loading

0 comments on commit 97b6d79

Please sign in to comment.