diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 2769f2d306..1942c6398d 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -34,6 +34,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import org.astraea.app.argument.DataRateField; import org.astraea.app.argument.DataSizeField; import org.astraea.app.argument.DistributionTypeField; @@ -58,6 +59,7 @@ import org.astraea.common.admin.Partition; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; +import org.astraea.common.admin.TopicPartitionPath; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.common.partitioner.Partitioner; @@ -103,52 +105,75 @@ public static List execute(final Argument param) { () -> producerThreads.stream().allMatch(AbstractThread::closed), () -> consumerThreads.stream().allMatch(AbstractThread::closed)); - var fileWriterTask = - CompletableFuture.completedFuture( - param.CSVPath == null - ? (Runnable) (() -> {}) - : ReportFormat.createFileWriter( - param.reportFormat, - param.CSVPath, - () -> consumerThreads.stream().allMatch(AbstractThread::closed), - () -> producerThreads.stream().allMatch(AbstractThread::closed))) - .thenAcceptAsync(Runnable::run); - - var monkeys = MonkeyThread.play(consumerThreads, param); - - CompletableFuture.runAsync( - () -> { - dataGenerator.waitForDone(); - var last = 0L; - var lastChange = System.currentTimeMillis(); - while (true) { - var current = Report.recordsConsumedTotal(); - - if (blockingQueues.stream().allMatch(Collection::isEmpty)) { - var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList(); - unfinishedProducers.forEach(AbstractThread::close); + try (var admin = Admin.of(param.bootstrapServers())) { + Supplier sizes = + () -> + admin.brokers().toCompletableFuture().join().stream() + .filter( + b -> + b.topicPartitionPaths().stream() + .anyMatch(p -> param.topics.contains(p.topic()))) + .mapToLong( + b -> + b.topicPartitionPaths().stream() + .mapToLong(TopicPartitionPath::size) + .sum()); + + var fileWriterTask = + CompletableFuture.completedFuture( + param.CSVPath == null + ? (Runnable) (() -> {}) + : ReportFormat.createFileWriter( + param.reportFormat, + param.CSVPath, + () -> consumerThreads.stream().allMatch(AbstractThread::closed), + () -> producerThreads.stream().allMatch(AbstractThread::closed), + param.logInterval, + List.of( + ReportFormat.CSVContentElement.create( + "max size", () -> String.valueOf(sizes.get().max().getAsLong())), + ReportFormat.CSVContentElement.create( + "min size", + () -> String.valueOf(sizes.get().min().getAsLong()))))) + .thenAcceptAsync(Runnable::run); + + var monkeys = MonkeyThread.play(consumerThreads, param); + + CompletableFuture.runAsync( + () -> { + dataGenerator.waitForDone(); + var last = 0L; + var lastChange = System.currentTimeMillis(); + while (true) { + var current = Report.recordsConsumedTotal(); + + if (blockingQueues.stream().allMatch(Collection::isEmpty)) { + var unfinishedProducers = + producerThreads.stream().filter(p -> !p.closed()).toList(); + unfinishedProducers.forEach(AbstractThread::close); + } + + if (current != last) { + last = current; + lastChange = System.currentTimeMillis(); + } + if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) { + consumerThreads.forEach(AbstractThread::close); + monkeys.forEach(AbstractThread::close); + } + if (consumerThreads.stream().allMatch(AbstractThread::closed) + && monkeys.stream().allMatch(AbstractThread::closed) + && producerThreads.stream().allMatch(AbstractThread::closed)) return; + Utils.sleep(Duration.ofSeconds(1)); } - - if (current != last) { - last = current; - lastChange = System.currentTimeMillis(); - } - if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) { - consumerThreads.forEach(AbstractThread::close); - monkeys.forEach(AbstractThread::close); - } - if (consumerThreads.stream().allMatch(AbstractThread::closed) - && monkeys.stream().allMatch(AbstractThread::closed) - && producerThreads.stream().allMatch(AbstractThread::closed)) return; - Utils.sleep(Duration.ofSeconds(1)); - } - }); - producerThreads.forEach(AbstractThread::waitForDone); - monkeys.forEach(AbstractThread::waitForDone); - consumerThreads.forEach(AbstractThread::waitForDone); - tracker.waitForDone(); - fileWriterTask.join(); - return param.topics; + }); + producerThreads.forEach(AbstractThread::waitForDone); + monkeys.forEach(AbstractThread::waitForDone); + consumerThreads.forEach(AbstractThread::waitForDone); + tracker.waitForDone(); + fileWriterTask.join(); + return param.topics; + } } static List consumers(Argument param, Map latestOffsets) { @@ -263,6 +288,12 @@ String partitioner() { return this.partitioner; } + @Parameter( + names = {"--log.interval"}, + description = "integer: seconds to log csv output", + validateWith = PositiveLongField.class) + Duration logInterval = Duration.ofSeconds(2); + @Parameter( names = {"--transaction.size"}, description = diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index d57ca80a2a..d7ef9bd032 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -69,7 +69,18 @@ public static Runnable createFileWriter( ReportFormat reportFormat, Path path, Supplier consumerDone, - Supplier producerDone) { + Supplier producerDone, + Duration interval) { + return createFileWriter(reportFormat, path, consumerDone, producerDone, interval, List.of()); + } + + public static Runnable createFileWriter( + ReportFormat reportFormat, + Path path, + Supplier consumerDone, + Supplier producerDone, + Duration interval, + List extraCsv) { var filePath = FileSystems.getDefault() .getPath( @@ -81,12 +92,18 @@ public static Runnable createFileWriter( var writer = new BufferedWriter(Utils.packException(() -> new FileWriter(filePath.toFile()))); switch (reportFormat) { case CSV -> { - initCSVFormat(writer, latencyAndIO()); + Supplier> elements = + () -> { + var e = new ArrayList<>(latencyAndIO()); + e.addAll(extraCsv); + return e; + }; + initCSVFormat(writer, elements.get()); return () -> { try { while (!(producerDone.get() && consumerDone.get())) { - logToCSV(writer, latencyAndIO()); - Utils.sleep(Duration.ofSeconds(1)); + logToCSV(writer, elements.get()); + Utils.sleep(interval); } } finally { Utils.close(writer); @@ -138,7 +155,7 @@ static void logToJSON(BufferedWriter writer, List elements) { } // Visible for test - interface CSVContentElement { + public interface CSVContentElement { String title(); String value(); diff --git a/common/src/main/java/org/astraea/common/metrics/collector/ServerMetricFetcher.java b/common/src/main/java/org/astraea/common/metrics/collector/ServerMetricFetcher.java index 3fbe66cbb5..aa2624fcd4 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/ServerMetricFetcher.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/ServerMetricFetcher.java @@ -32,9 +32,11 @@ import org.astraea.common.metrics.JndiClient; public class ServerMetricFetcher implements MetricsReporter, ClientTelemetry { - private static final String BOOTSTRAP_SERVERS = "server.metric.fetcher.bootstrap.servers"; + private static final String BOOTSTRAP_SERVERS_CONFIG = "server.metric.fetcher.bootstrap.servers"; + private static final String INTERVAL_CONFIG = "server.metric.fetcher.interval.seconds"; private String bootstrapServers; private int nodeId = -1; + private Duration interval = Duration.ofSeconds(10); private final AtomicBoolean closed = new AtomicBoolean(false); private final BlockingQueue queue = new LinkedBlockingQueue<>(1); @@ -55,22 +57,23 @@ public void close() { @Override public void configure(Map map) { - if (!map.containsKey(BOOTSTRAP_SERVERS)) - throw new RuntimeException(BOOTSTRAP_SERVERS + " is required"); + if (!map.containsKey(BOOTSTRAP_SERVERS_CONFIG)) + throw new RuntimeException(BOOTSTRAP_SERVERS_CONFIG + " is required"); if (!map.containsKey("node.id")) throw new RuntimeException("node.id is required"); - this.bootstrapServers = map.get(BOOTSTRAP_SERVERS).toString(); + this.bootstrapServers = map.get(BOOTSTRAP_SERVERS_CONFIG).toString(); nodeId = Integer.parseInt(map.get("node.id").toString()); + if (map.containsKey(INTERVAL_CONFIG)) + interval = Duration.ofSeconds(Long.parseLong(map.get(INTERVAL_CONFIG).toString())); CompletableFuture.runAsync( () -> { MetricSender sender = null; var lastSent = System.currentTimeMillis(); try { while (!closed.get()) { - var done = queue.poll(3, TimeUnit.SECONDS); + var done = queue.poll(interval.toSeconds(), TimeUnit.SECONDS); if (done == null) done = false; if (done) return; - if (System.currentTimeMillis() - lastSent <= Duration.ofSeconds(3).toMillis()) - continue; + if (System.currentTimeMillis() - lastSent <= interval.toMillis()) continue; if (sender == null) sender = MetricSender.topic(bootstrapServers); var beans = JndiClient.local().beans(BeanQuery.all()); sender.send(nodeId, beans);