diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 1ad2c557ee..df5b074e8e 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -94,34 +93,20 @@ private static DataSupplier dataSupplier(Performance.Argument argument) { } public static String execute(final Argument param) throws InterruptedException, IOException { - List partitions; - Map latestOffsets; - try (var topicAdmin = Admin.of(param.configs())) { - topicAdmin - .creator() - .numberOfReplicas(param.replicas) - .numberOfPartitions(param.partitions) - .topic(param.topic) - .create(); - - Utils.waitFor(() -> topicAdmin.topicNames().contains(param.topic)); - partitions = new ArrayList<>(partition(param, topicAdmin)); - latestOffsets = - topicAdmin.offsets(Set.of(param.topic)).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().latest())); - } - var groupId = "groupId-" + System.currentTimeMillis(); + // always try to init topic even though it may be existent already. + param.initTopic(); - Supplier partitionSupplier = - () -> partitions.isEmpty() ? -1 : partitions.get((int) (Math.random() * partitions.size())); + var latestOffsets = param.lastOffsets(); + + var groupId = "groupId-" + System.currentTimeMillis(); var producerThreads = ProducerThread.create( param.topic, param.transactionSize, dataSupplier(param), - partitionSupplier, + param.partitionSupplier(), param.producers, param::createProducer); var consumerThreads = @@ -206,23 +191,6 @@ public static String execute(final Argument param) throws InterruptedException, return param.topic; } - // visible for test - static Set partition(Argument param, Admin topicAdmin) { - if (positiveSpecifyBroker(param)) { - return topicAdmin - .partitions(Set.of(param.topic), new HashSet<>(param.specifyBroker)) - .values() - .stream() - .flatMap(Collection::stream) - .map(TopicPartition::partition) - .collect(Collectors.toSet()); - } else return Set.of(-1); - } - - private static boolean positiveSpecifyBroker(Argument param) { - return param.specifyBroker.stream().allMatch(broker -> broker >= 0); - } - public static class Argument extends org.astraea.app.argument.Argument { @Parameter( @@ -231,6 +199,35 @@ public static class Argument extends org.astraea.app.argument.Argument { validateWith = NonEmptyStringField.class) String topic = "testPerformance-" + System.currentTimeMillis(); + void initTopic() { + try (var admin = Admin.of(configs())) { + admin + .creator() + .numberOfReplicas(replicas) + .numberOfPartitions(partitions) + .topic(topic) + .create(); + Utils.waitFor(() -> admin.topicNames().contains(topic)); + } + } + + Map lastOffsets() { + try (var admin = Admin.of(configs())) { + // the slow zk causes unknown error, so we have to wait it. + return Utils.waitForNonNull( + () -> { + try { + return admin.offsets(Set.of(topic)).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().latest())); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }, + Duration.ofSeconds(5)); + } + } + @Parameter( names = {"--partitions"}, description = "Integer: number of partitions to create the topic", @@ -340,7 +337,19 @@ Producer createProducer() { description = "String: Used with SpecifyBrokerPartitioner to specify the brokers that partitioner can send.", validateWith = NonEmptyStringField.class) - List specifyBroker = List.of(-1); + List specifyBroker = List.of(); + + Supplier partitionSupplier() { + if (specifyBroker.isEmpty()) return () -> -1; + try (var admin = Admin.of(configs())) { + var partitions = + admin.partitions(Set.of(topic), new HashSet<>(specifyBroker)).values().stream() + .flatMap(Collection::stream) + .map(TopicPartition::partition) + .collect(Collectors.toUnmodifiableList()); + return () -> partitions.get((int) (Math.random() * partitions.size())); + } + } // replace DataSize by DataRate (see https://github.com/skiptests/astraea/issues/488) @Parameter( diff --git a/app/src/test/java/org/astraea/app/common/ExecutionRuntimeExceptionTest.java b/app/src/test/java/org/astraea/app/common/ExecutionRuntimeExceptionTest.java new file mode 100644 index 0000000000..968a92e401 --- /dev/null +++ b/app/src/test/java/org/astraea/app/common/ExecutionRuntimeExceptionTest.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.common; + +public class ExecutionRuntimeExceptionTest {} diff --git a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java index 62a70cd962..efceb3337d 100644 --- a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java +++ b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java @@ -18,9 +18,15 @@ import com.beust.jcommander.ParameterException; import java.time.Duration; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.admin.Admin; +import org.astraea.app.admin.TopicPartition; import org.astraea.app.argument.Argument; +import org.astraea.app.common.Utils; import org.astraea.app.consumer.Isolation; import org.astraea.app.service.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; @@ -166,4 +172,95 @@ void testChaosFrequency() { new String[] {"--bootstrap.servers", "localhost:9092", "--chaos.frequency", "10s"}); Assertions.assertEquals(Duration.ofSeconds(10), args.chaosDuration); } + + @Test + void testPartitionSupplier() { + var topicName = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers())) { + admin.creator().topic(topicName).numberOfPartitions(6).numberOfReplicas((short) 3).create(); + Utils.sleep(Duration.ofSeconds(2)); + var args = + Argument.parse( + new Performance.Argument(), + new String[] { + "--bootstrap.servers", + bootstrapServers(), + "--topic", + topicName, + "--specify.broker", + "1" + }); + var expectedPartitions = + admin.partitions(Set.of(topicName), Set.of(1)).get(1).stream() + .map(TopicPartition::partition) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertNotEquals(2, expectedPartitions.size()); + + var partitionSupplier = args.partitionSupplier(); + + var actual = + IntStream.range(0, 100) + .mapToObj(ignored -> partitionSupplier.get()) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertEquals(expectedPartitions, actual); + + // no specify broker + Assertions.assertEquals( + -1, + Argument.parse( + new Performance.Argument(), + new String[] {"--bootstrap.servers", bootstrapServers(), "--topic", topicName}) + .partitionSupplier() + .get()); + } + } + + @Test + void testLastOffsets() { + var partitionCount = 40; + var topicName = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers())) { + // large partitions + admin.creator().topic(topicName).numberOfPartitions(partitionCount).create(); + Utils.sleep(Duration.ofSeconds(2)); + var args = + Argument.parse( + new Performance.Argument(), + new String[] {"--bootstrap.servers", bootstrapServers(), "--topic", topicName}); + try (var producer = args.createProducer()) { + IntStream.range(0, 250) + .forEach( + i -> producer.sender().topic(topicName).key(String.valueOf(i).getBytes()).run()); + } + Assertions.assertEquals(partitionCount, args.lastOffsets().size()); + System.out.println(args.lastOffsets()); + args.lastOffsets().values().forEach(v -> Assertions.assertNotEquals(0, v)); + } + } + + @Test + void testInitTopic() { + var topicName = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers())) { + admin.creator().topic(topicName).numberOfPartitions(3).create(); + Utils.sleep(Duration.ofSeconds(2)); + var args = + Argument.parse( + new Performance.Argument(), + new String[] { + "--bootstrap.servers", + bootstrapServers(), + "--topic", + topicName, + "--partitions", + "3", + "--replicas", + "1" + }); + // they should all pass since the passed arguments are equal to existent topic + args.initTopic(); + args.initTopic(); + args.initTopic(); + } + } }