Skip to content

Commit

Permalink
Merge pull request #21 from navdeepsekhon/20-dup-validation
Browse files Browse the repository at this point in the history
#20 Duplicate topic definition check
  • Loading branch information
navdeepsekhon authored Jul 7, 2021
2 parents ab15b90 + bcf2f74 commit f63627c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ jobs:
# fallback to using the latest cache if no exact match is found
- v1-dependencies-

- run: gradle dependencies
- run: ./gradlew dependencies

- save_cache:
paths:
- ~/.gradle
key: v1-dependencies-{{ checksum "build.gradle" }}

# run tests!
- run: gradle test
- run: ./gradlew test
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ apply plugin: 'maven'
apply plugin: 'signing'

group 'co.navdeep'
version '1.4.2'
version '1.4.3'
archivesBaseName = "kafkaer"

sourceCompatibility = 1.8
Expand Down
Empty file modified gradlew
100644 → 100755
Empty file.
21 changes: 21 additions & 0 deletions src/main/java/co/navdeep/kafkaer/Configurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,32 @@ private void waitForDelete(String topicName) throws ExecutionException, Interrup
}
}
public void applyConfig() throws ExecutionException, InterruptedException {
validate();
configureTopics();
configureBrokers();
configureAcls();
}

public void validate(){
validateDuplicateTopics();
}

public void validateDuplicateTopics(){
Set<String> seen = new HashSet<>();
Set<String> duplicates = new HashSet<>();
for(String t : config.getAllTopicNames()){
if(seen.contains(t)){
duplicates.add(t);
}
seen.add(t);
}

if(!duplicates.isEmpty()){
logger.error("These topics are defined multiple times: {}", duplicates);
throw new RuntimeException("Duplicate topic definitions " + duplicates);
}
}

public void configureAcls() throws ExecutionException, InterruptedException {
logger.debug("Configuring ACLs");
List<AclBinding> bindings = config.getAclBindings();
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/ConfiguratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void testIncreasePartitions() throws ExecutionException, InterruptedExcep
topic.setPartitions(2);
configurator.applyConfig();

sleep();
compareWithKafkaTopic(topic);
}

Expand Down Expand Up @@ -382,6 +383,20 @@ public void testWipeSchema() throws ConfigurationException, IOException, RestCli
Mockito.verify(mock).deleteSubject(ArgumentMatchers.eq("x-value"));
}

@Test(expected = RuntimeException.class)
public void testDuplicateTopicValidation() throws ExecutionException, InterruptedException, ConfigurationException {
Config config = new Config();
String topicName = UUID.randomUUID().toString();
Topic topic = new Topic(topicName, 1, (short)1);
Topic topic2 = new Topic(topicName, 2, (short)1);
config.getTopics().add(topic);
config.getTopics().add(topic2);

Configurator configurator = new Configurator(Utils.readProperties(PROPERTIES_LOCATION), config);
configurator.applyConfig();

}

private void compareWithKafkaTopic(Topic topic) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic.getName()));
TopicDescription kafkaTopic = result.all().get().get(topic.getName());
Expand Down

0 comments on commit f63627c

Please sign in to comment.