Skip to content

Commit

Permalink
Fix unstable performance tool when it tries to create topic with a bu… (
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 23, 2022
1 parent 41d8181 commit bc3bd16
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 39 deletions.
87 changes: 48 additions & 39 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -94,34 +93,20 @@ private static DataSupplier dataSupplier(Performance.Argument argument) {
}

public static String execute(final Argument param) throws InterruptedException, IOException {
List<Integer> partitions;
Map<TopicPartition, Long> latestOffsets;
try (var topicAdmin = Admin.of(param.configs())) {
topicAdmin
.creator()
.numberOfReplicas(param.replicas)
.numberOfPartitions(param.partitions)
.topic(param.topic)
.create();

Utils.waitFor(() -> topicAdmin.topicNames().contains(param.topic));
partitions = new ArrayList<>(partition(param, topicAdmin));
latestOffsets =
topicAdmin.offsets(Set.of(param.topic)).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().latest()));
}

var groupId = "groupId-" + System.currentTimeMillis();
// always try to init topic even though it may be existent already.
param.initTopic();

Supplier<Integer> partitionSupplier =
() -> partitions.isEmpty() ? -1 : partitions.get((int) (Math.random() * partitions.size()));
var latestOffsets = param.lastOffsets();

var groupId = "groupId-" + System.currentTimeMillis();

var producerThreads =
ProducerThread.create(
param.topic,
param.transactionSize,
dataSupplier(param),
partitionSupplier,
param.partitionSupplier(),
param.producers,
param::createProducer);
var consumerThreads =
Expand Down Expand Up @@ -206,23 +191,6 @@ public static String execute(final Argument param) throws InterruptedException,
return param.topic;
}

// visible for test
static Set<Integer> partition(Argument param, Admin topicAdmin) {
if (positiveSpecifyBroker(param)) {
return topicAdmin
.partitions(Set.of(param.topic), new HashSet<>(param.specifyBroker))
.values()
.stream()
.flatMap(Collection::stream)
.map(TopicPartition::partition)
.collect(Collectors.toSet());
} else return Set.of(-1);
}

private static boolean positiveSpecifyBroker(Argument param) {
return param.specifyBroker.stream().allMatch(broker -> broker >= 0);
}

public static class Argument extends org.astraea.app.argument.Argument {

@Parameter(
Expand All @@ -231,6 +199,35 @@ public static class Argument extends org.astraea.app.argument.Argument {
validateWith = NonEmptyStringField.class)
String topic = "testPerformance-" + System.currentTimeMillis();

void initTopic() {
try (var admin = Admin.of(configs())) {
admin
.creator()
.numberOfReplicas(replicas)
.numberOfPartitions(partitions)
.topic(topic)
.create();
Utils.waitFor(() -> admin.topicNames().contains(topic));
}
}

Map<TopicPartition, Long> lastOffsets() {
try (var admin = Admin.of(configs())) {
// the slow zk causes unknown error, so we have to wait it.
return Utils.waitForNonNull(
() -> {
try {
return admin.offsets(Set.of(topic)).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().latest()));
} catch (Exception e) {
e.printStackTrace();
return null;
}
},
Duration.ofSeconds(5));
}
}

@Parameter(
names = {"--partitions"},
description = "Integer: number of partitions to create the topic",
Expand Down Expand Up @@ -340,7 +337,19 @@ Producer<byte[], byte[]> createProducer() {
description =
"String: Used with SpecifyBrokerPartitioner to specify the brokers that partitioner can send.",
validateWith = NonEmptyStringField.class)
List<Integer> specifyBroker = List.of(-1);
List<Integer> specifyBroker = List.of();

Supplier<Integer> partitionSupplier() {
if (specifyBroker.isEmpty()) return () -> -1;
try (var admin = Admin.of(configs())) {
var partitions =
admin.partitions(Set.of(topic), new HashSet<>(specifyBroker)).values().stream()
.flatMap(Collection::stream)
.map(TopicPartition::partition)
.collect(Collectors.toUnmodifiableList());
return () -> partitions.get((int) (Math.random() * partitions.size()));
}
}

// replace DataSize by DataRate (see https://github.com/skiptests/astraea/issues/488)
@Parameter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.common;

public class ExecutionRuntimeExceptionTest {}
97 changes: 97 additions & 0 deletions app/src/test/java/org/astraea/app/performance/PerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import com.beust.jcommander.ParameterException;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.Admin;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.argument.Argument;
import org.astraea.app.common.Utils;
import org.astraea.app.consumer.Isolation;
import org.astraea.app.service.RequireBrokerCluster;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -166,4 +172,95 @@ void testChaosFrequency() {
new String[] {"--bootstrap.servers", "localhost:9092", "--chaos.frequency", "10s"});
Assertions.assertEquals(Duration.ofSeconds(10), args.chaosDuration);
}

@Test
void testPartitionSupplier() {
var topicName = Utils.randomString(10);
try (var admin = Admin.of(bootstrapServers())) {
admin.creator().topic(topicName).numberOfPartitions(6).numberOfReplicas((short) 3).create();
Utils.sleep(Duration.ofSeconds(2));
var args =
Argument.parse(
new Performance.Argument(),
new String[] {
"--bootstrap.servers",
bootstrapServers(),
"--topic",
topicName,
"--specify.broker",
"1"
});
var expectedPartitions =
admin.partitions(Set.of(topicName), Set.of(1)).get(1).stream()
.map(TopicPartition::partition)
.collect(Collectors.toUnmodifiableSet());
Assertions.assertNotEquals(2, expectedPartitions.size());

var partitionSupplier = args.partitionSupplier();

var actual =
IntStream.range(0, 100)
.mapToObj(ignored -> partitionSupplier.get())
.collect(Collectors.toUnmodifiableSet());
Assertions.assertEquals(expectedPartitions, actual);

// no specify broker
Assertions.assertEquals(
-1,
Argument.parse(
new Performance.Argument(),
new String[] {"--bootstrap.servers", bootstrapServers(), "--topic", topicName})
.partitionSupplier()
.get());
}
}

@Test
void testLastOffsets() {
var partitionCount = 40;
var topicName = Utils.randomString(10);
try (var admin = Admin.of(bootstrapServers())) {
// large partitions
admin.creator().topic(topicName).numberOfPartitions(partitionCount).create();
Utils.sleep(Duration.ofSeconds(2));
var args =
Argument.parse(
new Performance.Argument(),
new String[] {"--bootstrap.servers", bootstrapServers(), "--topic", topicName});
try (var producer = args.createProducer()) {
IntStream.range(0, 250)
.forEach(
i -> producer.sender().topic(topicName).key(String.valueOf(i).getBytes()).run());
}
Assertions.assertEquals(partitionCount, args.lastOffsets().size());
System.out.println(args.lastOffsets());
args.lastOffsets().values().forEach(v -> Assertions.assertNotEquals(0, v));
}
}

@Test
void testInitTopic() {
var topicName = Utils.randomString(10);
try (var admin = Admin.of(bootstrapServers())) {
admin.creator().topic(topicName).numberOfPartitions(3).create();
Utils.sleep(Duration.ofSeconds(2));
var args =
Argument.parse(
new Performance.Argument(),
new String[] {
"--bootstrap.servers",
bootstrapServers(),
"--topic",
topicName,
"--partitions",
"3",
"--replicas",
"1"
});
// they should all pass since the passed arguments are equal to existent topic
args.initTopic();
args.initTopic();
args.initTopic();
}
}
}

0 comments on commit bc3bd16

Please sign in to comment.