Skip to content

Commit

Permalink
Merge pull request #11 from navdeepsekhon/9-debug-mode
Browse files Browse the repository at this point in the history
9 debug mode
  • Loading branch information
navdeepsekhon authored Jun 17, 2020
2 parents a74c50f + ac6fae5 commit 058a93b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 8 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
compile "org.apache.commons:commons-configuration2:2.4"
compile "commons-beanutils:commons-beanutils:1.9.3"
compile "args4j:args4j:2.33"
compile "org.slf4j:slf4j-simple:1.7.30"

compileOnly "org.projectlombok:lombok:1.18.6"
annotationProcessor "org.projectlombok:lombok:1.18.6"
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/co/navdeep/kafkaer/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class App {
public static void main(String[] a) throws Exception {
Expand All @@ -16,10 +18,17 @@ public static void main(String[] a) throws Exception {
parser.printUsage(System.out);
return;
}

if(args.isDebug()){
System.setProperty("org.slf4j.simpleLogger.log.co.navdeep", "debug");
}
Logger logger = LoggerFactory.getLogger(App.class);

if(args.getProperties() == null || args.getConfig() == null) {
throw new RuntimeException("Missing required arguments - propertiesLocation, configLocation");
}

logger.debug("Input args: config: [{}] properties: [{}] wipe:[{}]", args.getConfig(), args.getProperties(), args.isWipe());
Configurator configurator = new Configurator(args.getProperties(), args.getConfig());
if(args.isWipe())
configurator.wipeTopics();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/co/navdeep/kafkaer/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class Args {
@Option(name="--help", aliases= "-h", help = true, usage="list usage", handler = BooleanOptionHandler.class)
boolean help;

@Option(name="--debug", aliases = "-d", usage = "debug mode", handler = BooleanOptionHandler.class)
boolean debug;

@Argument
private List<String> arguments = new ArrayList<>();

Expand Down
52 changes: 46 additions & 6 deletions src/main/java/co/navdeep/kafkaer/Configurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -25,6 +28,8 @@ public class Configurator {
private Config config;
private AdminClient adminClient;

private static Logger logger = LoggerFactory.getLogger(Configurator.class);

public Configurator(String propertiesLocation, String configLocation) throws ConfigurationException, IOException {
properties = Utils.readProperties(propertiesLocation);
config = Utils.readConfig(configLocation, Utils.propertiesToMap(properties));
Expand All @@ -38,28 +43,54 @@ public Configurator(Configuration p, Config c){
}

public void wipeTopics() throws ExecutionException, InterruptedException {
logger.debug("Deleting topics");
DeleteTopicsResult result = adminClient.deleteTopics(config.getAllTopicNames());
result.all().get();
for(String topic : result.values().keySet()){
try {
logger.debug("Deleting topic: {}", topic);
result.values().get(topic).get();
} catch(ExecutionException e){
if(e.getCause() instanceof UnknownTopicOrPartitionException){
logger.debug("Unable to delete topic {} because it does not exist.", topic);
} else {
throw e;
}
}

}
}

public void applyConfig() throws ExecutionException, InterruptedException {
configureTopics();
configureBrokers();
configureAcls();
}

public void configureAcls() throws ExecutionException, InterruptedException {
logger.debug("Configuring ACLs");
List<AclBinding> bindings = config.getAclBindings();
if(bindings.isEmpty()) return;
if(bindings.isEmpty()){
logger.debug("No ACLs defined in config. Nothing done.");
return;
}

CreateAclsResult result = adminClient.createAcls(bindings);
result.all().get();
for(AclBinding binding : result.values().keySet()){
logger.debug("Creating ACL {}", binding);
result.values().get(binding).get();
}
}

public void configureBrokers() throws ExecutionException, InterruptedException {
if(!config.hasBrokerConfig()) return;
logger.debug("Configuring brokers");
if(!config.hasBrokerConfig()){
logger.debug("No broker configs defined. Nothing done.");
return;
}

Map<ConfigResource, org.apache.kafka.clients.admin.Config> updateConfig = new HashMap<>();
for(Broker broker : config.getBrokers()){
logger.debug("Applying broker config {}", broker);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, broker.getId());
updateConfig.put(configResource, broker.configsAsKafkaConfig());
}
Expand All @@ -70,15 +101,23 @@ public void configureBrokers() throws ExecutionException, InterruptedException {
}

public void configureTopics() throws ExecutionException, InterruptedException {
logger.debug("Configuring topics");
Map<String, KafkaFuture<TopicDescription>> topicResults = adminClient.describeTopics(config.getAllTopicNames()).values();
for(Topic topic : config.getTopics()){
logger.debug("Topic config: {}", topic);
try {
TopicDescription td = topicResults.get(topic.getName()).get();
logger.debug("Updating existing topic {}", topic.getName());
handleTopicPartitionsUpdate(td, topic);
handleTopicConfigUpdate(topic);
} catch(ExecutionException e){
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic.toNewTopic()));
result.all().get();
if(e.getCause() instanceof UnknownTopicOrPartitionException) {
logger.debug("Creating new topic {}", topic.getName());
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic.toNewTopic()));
result.all().get();
} else {
throw(e);
}
}
}
}
Expand All @@ -98,6 +137,7 @@ private void handleTopicConfigUpdate(Topic topic) throws InterruptedException {
private void handleTopicPartitionsUpdate(TopicDescription current, Topic topic) throws InterruptedException {
try {
if(current.partitions().size() < topic.getPartitions()){
logger.debug("Updating partition count for topic {} from [{}] to [{}]", topic.getName(), current.partitions().size(), topic.getPartitions());
CreatePartitionsResult result = adminClient.createPartitions(Collections.singletonMap(topic.getName(), NewPartitions.increaseTo(topic.getPartitions())));
result.all().get();
} else if(current.partitions().size() > topic.getPartitions()){
Expand Down
20 changes: 18 additions & 2 deletions src/test/java/ConfiguratorTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import co.navdeep.kafkaer.Configurator;
import co.navdeep.kafkaer.model.Acl;
import co.navdeep.kafkaer.model.Broker;
import co.navdeep.kafkaer.model.Config;
import co.navdeep.kafkaer.model.Topic;
import co.navdeep.kafkaer.utils.Utils;
import co.navdeep.kafkaer.model.Config;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.junit.*;

Expand Down Expand Up @@ -270,6 +269,23 @@ public void testWipe() throws ConfigurationException, ExecutionException, Interr
Assert.assertFalse(adminClient.listTopics().names().get().contains(topic.getName()));
}

@Test
public void testNonExistingTopicWipeNoException() throws ConfigurationException {
Config config = new Config();
String topicName = UUID.randomUUID().toString();
Topic topic = new Topic(topicName, 1, (short)1);
config.getTopics().add(topic);

Configurator configurator = new Configurator(Utils.readProperties(PROPERTIES_LOCATION), config);

try {
configurator.wipeTopics();
} catch(Exception e){
Assert.fail();
e.printStackTrace();
}
}

private void compareWithKafkaTopic(Topic topic) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic.getName()));
TopicDescription kafkaTopic = result.all().get().get(topic.getName());
Expand Down

0 comments on commit 058a93b

Please sign in to comment.