Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to bypass the dynamic bandwidth estimation in NetworkCost #1839

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions common/src/main/java/org/astraea/common/cost/NetworkCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ public MetricSensor metricSensor() {
.toList();
}

/**
* Set {@code partitionNetIn} and {@code partitionNetOut} network usage to the given {@code
* handle}. This function offers a way to specify the network usage of each partition directly,
* which can be crucial for situations like software simulation, where every partition resource
* usage and pattern is deterministic before the simulation run, so there is no need to trigger a
* bandwidth estimation.
*
* @param handle a {@link ClusterBean} object associate with the following partition usage.
* @param partitionNetIn the network ingress usage of each partition.
* @param partitionNetOut the network egress usage of each partition.
*/
public void setCalculation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個方法放在這裡覺有點粗暴,似乎會變成這個 cost 獨自使用的功能?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個方法放在這裡覺有點粗暴

我也覺得這個方法太暴露,我有想到其他做法是,讓 cachedCalculation 變成 protected,這樣實驗的程式碼可以自己去繼承新的 class 做這些控制。更正確的做法是把計算的這個過程抽象成一個介面,然後讓目前的 estimation 和這個 PR 的直接指定都成為該介面的實作。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

更正確的做法是把計算的這個過程抽象成一個介面,然後讓目前的 estimation 和這個 PR 的直接指定都成為該介面的實作。

這個手法很優雅,有機會在這支Pr完成嗎?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我這邊工作已經告一段落了,所以我想這個需求已經沒有存在的必要,不過我可以幫忙開 Issue 描述一下,如果你覺得有必要保留這個改變

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok 那再麻煩一下

ClusterBean handle,
Map<TopicPartition, Long> partitionNetIn,
Map<TopicPartition, Long> partitionNetOut) {
this.calculationCache.put(handle, new CachedCalculation(partitionNetIn, partitionNetOut));
}

private Map<BrokerTopic, List<Replica>> mapLeaderAllocation(ClusterInfo clusterInfo) {
return clusterInfo
.replicaStream()
Expand Down Expand Up @@ -342,6 +360,11 @@ private CachedCalculation(ClusterBean sourceMetric) {
this.partitionEgressRate =
estimateRate(metricViewCluster, sourceMetric, ServerMetrics.Topic.BYTES_OUT_PER_SEC);
}

private CachedCalculation(Map<TopicPartition, Long> netIn, Map<TopicPartition, Long> netOut) {
this.partitionIngressRate = netIn;
this.partitionEgressRate = netOut;
}
}

static class NetworkClusterCost implements ClusterCost {
Expand Down
35 changes: 35 additions & 0 deletions common/src/test/java/org/astraea/common/cost/NetworkCostTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

class NetworkCostTest {

Expand Down Expand Up @@ -490,6 +491,40 @@ void testPartitionCost() {
Assertions.assertEquals(ingressPartitionCost.get(TopicPartition.of("test-8")), (double) 9 / 18);
}

@ParameterizedTest
@ValueSource(classes = {NetworkIngressCost.class, NetworkEgressCost.class})
void testSetCalculation(Class<? extends NetworkCost> costClass) {
var cluster =
ClusterInfo.builder()
.addNode(Set.of(1, 2))
.addFolders(Map.of(1, Set.of("/a")))
.addFolders(Map.of(2, Set.of("/a")))
.addTopic("topic", 1, (short) 1)
.build();
var handle = ClusterBean.of(Map.of());
var ingress = Mockito.spy(Map.of(TopicPartition.of("topic-0"), 1024L));
var egress = Mockito.spy(Map.of(TopicPartition.of("topic-0"), 1024L));
var costFunction = Utils.construct(costClass, Configuration.EMPTY);

costFunction.setCalculation(handle, ingress, egress);

Assertions.assertDoesNotThrow(
() -> {
Mockito.verifyNoInteractions(ingress);
Mockito.verifyNoInteractions(egress);
},
"The ingress/egress maps should not be touched");
Assertions.assertDoesNotThrow(
() -> costFunction.clusterCost(cluster, handle),
"Trigger the cost evaluation, this should touch the specified calculations");
Assertions.assertDoesNotThrow(
() -> {
Mockito.verify(ingress, Mockito.atLeastOnce()).get(Mockito.any(TopicPartition.class));
Mockito.verify(egress, Mockito.atLeastOnce()).get(Mockito.any(TopicPartition.class));
},
"The ingress/egress maps should be touched");
}

@ParameterizedTest
@CsvSource({
"BYTES_IN_PER_SEC, org.astraea.common.cost.NetworkIngressCost",
Expand Down