Skip to content

Commit

Permalink
[CHECKER] add all request graud (#1856)
Browse files Browse the repository at this point in the history
  • Loading branch information
gongxuanzhang authored Jan 3, 2025
1 parent 787b26e commit dde4e60
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 13 deletions.
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/checker/Checker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

public class Checker {

private static final List<Guard> GUARDS = List.of(new ProduceRpcGuard());
private static final List<Guard> GUARDS = List.of(new RpcGuard());

public static void main(String[] args) throws Exception {
execute(Argument.parse(new Argument(), args));
Expand All @@ -45,7 +45,7 @@ public static void execute(final Argument param) throws Exception {
try (var admin = Admin.create(Map.of("bootstrap.servers", param.bootstrapServers()))) {
for (var guard : GUARDS) {
var result = guard.run(admin, param.mBeanClientFunction(), param.readChangelog());
System.out.println(result);
result.forEach(System.out::println);
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ Stream<Report> stream() {
if (why.isEmpty()) return Stream.empty();
return Stream.of(this);
}

@Override
public String toString() {
if (node == null) {
return "Report[pass]";
}
return "Report[" + node + "] why = " + why;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,36 @@
*/
package org.astraea.app.checker;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.broker.NetworkMetrics;

public class ProduceRpcGuard implements Guard {
public class RpcGuard implements Guard {
@Override
public Collection<Report> run(
Admin admin, Function<Node, MBeanClient> clients, Changelog changelog) throws Exception {
Map<String, Protocol> protocols = changelog.protocols();
return admin.describeCluster().nodes().get().stream()
.map(node -> checkNode(node, protocols, clients))
.flatMap(Collection::stream)
.toList();
}

private Collection<Report> checkNode(
Node node, Map<String, Protocol> protocols, Function<Node, MBeanClient> clients) {
return Arrays.stream(NetworkMetrics.Request.values())
.filter(request -> protocols.containsKey(request.metricName().toLowerCase()))
.map(
node -> {
var protocol =
changelog
.protocols()
.get(NetworkMetrics.Request.PRODUCE.metricName().toLowerCase());
if (protocol == null) return Report.empty();
request -> {
var protocol = protocols.get(request.metricName().toLowerCase());
var versions = NetworkMetrics.Request.PRODUCE.versions(clients.apply(node));
return Report.of(node, protocol, versions);
})
.flatMap(Report::stream)
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public Set<Integer> versions(MBeanClient mBeanClient) {
BeanQuery.builder()
.domainName("kafka.network")
.property("type", "RequestMetrics")
.property("request", "Produce")
.property("request", this.metricName())
.property("name", "RequestsPerSec")
.property("version", "*")
.build());
Expand Down
Loading

0 comments on commit dde4e60

Please sign in to comment.