diff --git a/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java b/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java index a0b8d715bd..070d0e17bc 100644 --- a/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java +++ b/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java @@ -36,7 +36,7 @@ import org.astraea.app.cost.HasClusterCost; import org.astraea.app.metrics.HasBeanObject; -class BalancerUtils { +public class BalancerUtils { /** * Create a {@link ClusterInfo} with its log placement replaced by {@link ClusterLogAllocation}. diff --git a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java index fce5e21dc7..dbaca9940a 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java @@ -25,6 +25,11 @@ @FunctionalInterface public interface RebalancePlanGenerator { + + static RebalancePlanGenerator random(int numberOfShuffle) { + return new ShufflePlanGenerator(() -> numberOfShuffle); + } + /** * Generate a rebalance proposal, noted that this function doesn't require proposing exactly the * same plan for the same input argument. There can be some randomization that takes part in this diff --git a/app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java b/app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java index b911fd8659..2dc19629f7 100644 --- a/app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java +++ b/app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java @@ -45,7 +45,15 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) return () -> dispersion.calculate(brokerScore.values()); } - Map leaderCount(ClusterInfo ignored, ClusterBean clusterBean) { + private static Map leaderCount( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + var leaderCount = leaderCount(clusterBean); + // if there is no available metrics, we re-count the leaders based on cluster information + if (leaderCount.values().stream().mapToInt(i -> i).sum() == 0) return leaderCount(clusterInfo); + return leaderCount; + } + + static Map leaderCount(ClusterBean clusterBean) { return clusterBean.all().entrySet().stream() .collect( Collectors.toMap( @@ -60,26 +68,17 @@ Map leaderCount(ClusterInfo ignored, ClusterBean clusterBean) .sum())); } + static Map leaderCount(ClusterInfo clusterInfo) { + return clusterInfo.topics().stream() + .flatMap(t -> clusterInfo.availableReplicaLeaders(t).stream()) + .collect(Collectors.groupingBy(r -> r.nodeInfo().id())) + .entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().size())); + } + @Override public Optional fetcher() { return Optional.of(c -> List.of(ServerMetrics.ReplicaManager.LEADER_COUNT.fetch(c))); } - - public static class NoMetrics extends ReplicaLeaderCost { - - @Override - Map leaderCount(ClusterInfo clusterInfo, ClusterBean ignored) { - return clusterInfo.topics().stream() - .flatMap(t -> clusterInfo.availableReplicaLeaders(t).stream()) - .collect(Collectors.groupingBy(r -> r.nodeInfo().id())) - .entrySet() - .stream() - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().size())); - } - - @Override - public Optional fetcher() { - return Optional.empty(); - } - } } diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java new file mode 100644 index 0000000000..9d1871baf6 --- /dev/null +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.astraea.app.admin.Admin; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.balancer.BalancerUtils; +import org.astraea.app.balancer.RebalancePlanProposal; +import org.astraea.app.balancer.generator.RebalancePlanGenerator; +import org.astraea.app.balancer.log.ClusterLogAllocation; +import org.astraea.app.balancer.log.LogPlacement; +import org.astraea.app.cost.HasClusterCost; +import org.astraea.app.cost.ReplicaLeaderCost; + +class BalancerHandler implements Handler { + + static String LIMIT_KEY = "limit"; + + static int LIMIT_DEFAULT = 10000; + private final Admin admin; + private final RebalancePlanGenerator generator = RebalancePlanGenerator.random(30); + private final HasClusterCost costFunction; + + BalancerHandler(Admin admin) { + this(admin, new ReplicaLeaderCost()); + } + + BalancerHandler(Admin admin, HasClusterCost costFunction) { + this.admin = admin; + this.costFunction = costFunction; + } + + @Override + public Response get(Optional target, Map queries) { + var clusterInfo = admin.clusterInfo(); + var clusterAllocation = ClusterLogAllocation.of(clusterInfo); + var cost = costFunction.clusterCost(clusterInfo, ClusterBean.EMPTY).value(); + var limit = Integer.parseInt(queries.getOrDefault(LIMIT_KEY, String.valueOf(LIMIT_DEFAULT))); + var planAndCost = + generator + .generate(admin.brokerFolders(), clusterAllocation) + .limit(limit) + .map(RebalancePlanProposal::rebalancePlan) + .map( + cla -> + Map.entry( + cla, + costFunction + .clusterCost(BalancerUtils.merge(clusterInfo, cla), ClusterBean.EMPTY) + .value())) + .filter(e -> e.getValue() <= cost) + .min(Comparator.comparingDouble(Map.Entry::getValue)); + + return new Report( + cost, + planAndCost.map(Map.Entry::getValue).orElse(cost), + limit, + costFunction.getClass().getSimpleName(), + planAndCost + .map( + entry -> + ClusterLogAllocation.findNonFulfilledAllocation( + clusterAllocation, entry.getKey()) + .stream() + .map( + tp -> + new Change( + tp.topic(), + tp.partition(), + placements(clusterAllocation.logPlacements(tp)), + placements(entry.getKey().logPlacements(tp)))) + .collect(Collectors.toUnmodifiableList())) + .orElse(List.of())); + } + + static List placements(List lps) { + return lps.stream().map(Placement::new).collect(Collectors.toUnmodifiableList()); + } + + static class Placement { + + final int brokerId; + final String directory; + + Placement(LogPlacement lp) { + this.brokerId = lp.broker(); + this.directory = lp.logDirectory().orElse(null); + } + } + + static class Change { + final String topic; + final int partition; + final List before; + final List after; + + Change(String topic, int partition, List before, List after) { + this.topic = topic; + this.partition = partition; + this.before = before; + this.after = after; + } + } + + static class Report implements Response { + final double cost; + final double newCost; + + final int limit; + + final String function; + final List changes; + + Report(double cost, double newCost, int limit, String function, List changes) { + this.cost = cost; + this.newCost = newCost; + this.limit = limit; + this.function = function; + this.changes = changes; + } + } +} diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index 3d93b2be21..61f5b95ac2 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -48,6 +48,7 @@ private static void execute(Argument arg) throws IOException { server.createContext( "/records", new RecordHandler(Admin.of(arg.configs()), arg.bootstrapServers())); server.createContext("/reassignments", new ReassignmentHandler(Admin.of(arg.configs()))); + server.createContext("/balancer", new BalancerHandler(Admin.of(arg.configs()))); server.start(); } diff --git a/app/src/test/java/org/astraea/app/cost/ReplicaLeaderCostTest.java b/app/src/test/java/org/astraea/app/cost/ReplicaLeaderCostTest.java index b2e7f85d58..a3bd9685e2 100644 --- a/app/src/test/java/org/astraea/app/cost/ReplicaLeaderCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/ReplicaLeaderCostTest.java @@ -39,11 +39,10 @@ void testNoMetrics() { ReplicaInfo.of("topic", 0, NodeInfo.of(10, "broker0", 1111), true, true, true), ReplicaInfo.of("topic", 0, NodeInfo.of(10, "broker0", 1111), true, true, true), ReplicaInfo.of("topic", 0, NodeInfo.of(11, "broker1", 1111), true, true, true)); - var function = new ReplicaLeaderCost.NoMetrics(); var clusterInfo = Mockito.mock(ClusterInfo.class); Mockito.when(clusterInfo.topics()).thenReturn(Set.of("topic")); Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())).thenReturn(replicas); - var cost = function.leaderCount(clusterInfo, ClusterBean.EMPTY); + var cost = ReplicaLeaderCost.leaderCount(clusterInfo); Assertions.assertTrue(cost.containsKey(10)); Assertions.assertTrue(cost.containsKey(11)); Assertions.assertEquals(2, cost.size()); diff --git a/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java b/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java index a6887e39b4..af49d3d8d1 100644 --- a/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java +++ b/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java @@ -70,13 +70,13 @@ void testNegativeWeight() { IllegalArgumentException.class, () -> dispatcher.configure( - Configuration.of(Map.of(ReplicaLeaderCost.NoMetrics.class.getName(), "-1")))); + Configuration.of(Map.of(ReplicaLeaderCost.class.getName(), "-1")))); // Test for cost functions configuring dispatcher.configure( Configuration.of( Map.of( - ReplicaLeaderCost.NoMetrics.class.getName(), + ReplicaLeaderCost.class.getName(), "0.1", BrokerInputCost.class.getName(), "2", @@ -92,7 +92,7 @@ void testConfigureCostFunctions() { dispatcher.configure( Configuration.of( Map.of( - ReplicaLeaderCost.NoMetrics.class.getName(), + ReplicaLeaderCost.class.getName(), "0.1", BrokerInputCost.class.getName(), "2", diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java new file mode 100644 index 0000000000..4d9895bdb2 --- /dev/null +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.astraea.app.admin.Admin; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.common.Utils; +import org.astraea.app.cost.ClusterCost; +import org.astraea.app.cost.HasClusterCost; +import org.astraea.app.service.RequireBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class BalancerHandlerTest extends RequireBrokerCluster { + + @Test + void testReport() { + var topicName = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers())) { + admin.creator().topic(topicName).numberOfPartitions(10).numberOfReplicas((short) 3).create(); + Utils.sleep(Duration.ofSeconds(3)); + var handler = new BalancerHandler(admin, new MyCost()); + var report = + Assertions.assertInstanceOf( + BalancerHandler.Report.class, + handler.get(Optional.empty(), Map.of(BalancerHandler.LIMIT_KEY, "30"))); + Assertions.assertEquals(30, report.limit); + Assertions.assertNotEquals(0, report.changes.size()); + Assertions.assertTrue(report.cost >= report.newCost); + Assertions.assertEquals(MyCost.class.getSimpleName(), report.function); + } + } + + private static class MyCost implements HasClusterCost { + private final AtomicInteger count = new AtomicInteger(0); + + @Override + public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { + var cost = count.getAndIncrement() == 0 ? Double.MAX_VALUE : Math.random() * 100; + return () -> cost; + } + } +} diff --git a/docs/web_server/README.md b/docs/web_server/README.md index f0680d4538..95dfea3dbd 100644 --- a/docs/web_server/README.md +++ b/docs/web_server/README.md @@ -24,3 +24,4 @@ Astraea 建立了一套 Web Server 服務,使用者可以透過簡易好上手 - [/beans](./web_api_beans_chinese.md) - [/reassignments](./web_api_reassignments_chinese.md) - [/records](./web_api_records_chinese.md) +- [/balancer](./web_api_balancer_chinese.md) diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md new file mode 100644 index 0000000000..7913d6eb26 --- /dev/null +++ b/docs/web_server/web_api_balancer_chinese.md @@ -0,0 +1,73 @@ +/balancer +=== + +- [查詢更好的 partitions 配置](#查詢更好的-partitions-配置) + +## 查詢更好的 partitions 配置 +```shell +GET /balancer +``` + +參數 + +| 名稱 | 說明 | 預設值 | +|-------|--------------|-------| +| limit | (選填) 要嘗試幾種組合 | 10000 | + +cURL 範例 +```shell +curl -X GET http://localhost:8001/balancer +``` + +JSON Response 範例 +- `cost`: 目前叢集的成本 (越高越不好) +- `newCost`: 評估後比較好的成本 (<= `cost`) +- `limit`: 嘗試了幾種組合 +- `function`: 用來評估品質的方法 +- `changes`: 新的 partitions 配置 + - `topic`: topic 名稱 + - `partition`: partition id + - `before`: 原本的配置 + - `after`: 比較好的配置 +```json +{ + "cost": 0.04948716593053935, + "newCost": 0.04948716593053935, + "limit": 10000, + "function": "ReplicaLeaderCost", + "changes": [ + { + "topic": "__consumer_offsets", + "partition": 40, + "before": [ + { + "brokerId": 1006, + "directory": "/tmp/log-folder-0" + } + ], + "after": [ + { + "brokerId": 1002, + "directory": "/tmp/log-folder-0" + } + ] + }, + { + "topic": "__consumer_offsets", + "partition": 44, + "before": [ + { + "brokerId": 1003, + "directory": "/tmp/log-folder-0" + } + ], + "after": [ + { + "brokerId": 1001, + "directory": "/tmp/log-folder-2" + } + ] + } + ] +} +```