diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java index 39af56ea04ed7..feced54011acb 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java @@ -24,23 +24,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; - import static org.awaitility.Awaitility.await; public class KafkaUtils { private static final Logger LOGGER = LogManager.getLogger(KafkaUtils.class); - /** - * Creates kafka topic - * - * @param topicName the topic name - * @param bootstrapServer kafka bootstrap server list - */ - public static void createTopic(String topicName, String bootstrapServer) { - createTopic(topicName, 1, bootstrapServer); - } - public static void createTopic(String topicName, int numOfPartitions, String bootstrapServers) { try { getAdminClient(bootstrapServers, (client -> { @@ -60,14 +48,11 @@ public static void createTopic(String topicName, int numOfPartitions, String boo public static boolean checkTopicExistence(String topicName, String bootstrapServers) { return getAdminClient(bootstrapServers, (client -> { - Map> topics = client.describeTopics(List.of(topicName)).values(); + Map> topics = client.describeTopics(List.of(topicName)).topicNameValues(); try { return topics.containsKey(topicName) && topics.get(topicName).get().name().equals(topicName); - } catch (InterruptedException e) { - LOGGER.error("error on checkTopicExistence", e); - return false; - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOGGER.error("error on checkTopicExistence", e); return false; } @@ -75,13 +60,12 @@ public static boolean checkTopicExistence(String topicName, String bootstrapServ } private static Rep getAdminClient(String bootstrapServer, Function function) { - AdminClient adminClient = KafkaAdminClient.create( - ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") - ); - try { + try ( + AdminClient adminClient = KafkaAdminClient.create( + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") + ) + ) { return function.apply(adminClient); - } finally { - adminClient.close(); } } }