Skip to content

Commit

Permalink
[COST] Enable to tune the estimation method of NetworkCost (#1643)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored Apr 8, 2023
1 parent 71e8d96 commit 94f5a29
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 25 deletions.
40 changes: 40 additions & 0 deletions common/src/main/java/org/astraea/common/cost/EstimationMethod.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import org.astraea.common.EnumInfo;

/** Method to estimate the partition bandwidth */
enum EstimationMethod implements EnumInfo {
BROKER_TOPIC_ONE_MINUTE_RATE,
BROKER_TOPIC_FIVE_MINUTE_RATE,
BROKER_TOPIC_FIFTEEN_MINUTE_RATE;

static EstimationMethod ofAlias(String alias) {
return EnumInfo.ignoreCaseEnum(EstimationMethod.class, alias);
}

@Override
public String alias() {
return name();
}

@Override
public String toString() {
return alias();
}
}
34 changes: 31 additions & 3 deletions common/src/main/java/org/astraea/common/cost/NetworkCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
import org.astraea.common.DataRate;
import org.astraea.common.EnumInfo;
import org.astraea.common.admin.BrokerTopic;
Expand All @@ -36,7 +37,6 @@
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.cost.utils.ClusterInfoSensor;
import org.astraea.common.metrics.HasBeanObject;
import org.astraea.common.metrics.broker.HasRate;
import org.astraea.common.metrics.broker.LogMetrics;
import org.astraea.common.metrics.broker.ServerMetrics;
import org.astraea.common.metrics.collector.MetricSensor;
Expand All @@ -60,17 +60,32 @@
* every consumer fetches data from the leader(which is the default behavior of Kafka). For
* more detail about consumer rack awareness or how consumer can fetch data from the closest
* replica, see <a href="https://cwiki.apache.org/confluence/x/go_zBQ">KIP-392<a>.
* <li>NetworkCost implementation use broker-topic bandwidth rate and some other info to estimate
* the broker-topic-partition bandwidth rate. The implementation assume the broker-topic
* bandwidth is correct and steadily reflect the actual resource usage. This is generally true
* when the broker has reach its steady state, but to reach that state might takes awhile. And
* based on our observation this probably won't happen at the early broker start (see <a
* href="https://github.com/skiptests/astraea/issues/1641">Issue #1641</a>). We suggest use
* this cost with metrics from the servers in steady state.
* </ol>
*/
public abstract class NetworkCost implements HasClusterCost {

public static final String NETWORK_COST_ESTIMATION_METHOD = "network.cost.estimation.method";

private final EstimationMethod estimationMethod;
private final BandwidthType bandwidthType;
private final Map<ClusterBean, CachedCalculation> calculationCache;
private final ClusterInfoSensor clusterInfoSensor = new ClusterInfoSensor();

NetworkCost(BandwidthType bandwidthType) {
NetworkCost(Configuration config, BandwidthType bandwidthType) {
this.bandwidthType = bandwidthType;
this.calculationCache = new ConcurrentHashMap<>();
this.estimationMethod =
config
.string(NETWORK_COST_ESTIMATION_METHOD)
.map(EstimationMethod::ofAlias)
.orElse(EstimationMethod.BROKER_TOPIC_ONE_MINUTE_RATE);
}

void noMetricCheck(ClusterBean clusterBean) {
Expand Down Expand Up @@ -221,7 +236,20 @@ Map<TopicPartition, Long> estimateRate(
.brokerTopicMetrics(bt, ServerMetrics.Topic.Meter.class)
.filter(bean -> bean.type().equals(metric))
.max(Comparator.comparingLong(HasBeanObject::createdTimestamp))
.map(HasRate::fifteenMinuteRate)
.map(
hasRate -> {
switch (estimationMethod) {
case BROKER_TOPIC_ONE_MINUTE_RATE:
return hasRate.oneMinuteRate();
case BROKER_TOPIC_FIVE_MINUTE_RATE:
return hasRate.fiveMinuteRate();
case BROKER_TOPIC_FIFTEEN_MINUTE_RATE:
return hasRate.fifteenMinuteRate();
default:
throw new IllegalStateException(
"Unknown estimation method: " + estimationMethod);
}
})
// no load metric for this partition, treat as zero load
.orElse(0.0);
if (Double.isNaN(totalShare) || totalShare < 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.astraea.common.cost;

import org.astraea.common.Configuration;

/**
* A cost function to evaluate cluster load balance score in terms of message egress data rate. See
* {@link NetworkCost} for further detail.
*/
public class NetworkEgressCost extends NetworkCost {
public NetworkEgressCost() {
super(BandwidthType.Egress);
public NetworkEgressCost(Configuration config) {
super(config, BandwidthType.Egress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class NetworkIngressCost extends NetworkCost implements HasPartitionCost
private static final String TRAFFIC_INTERVAL = "traffic.interval";

public NetworkIngressCost(Configuration config) {
super(BandwidthType.Ingress);
super(config, BandwidthType.Ingress);
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

public interface HasRate extends HasBeanObject {
default double meanRate() {
return (double) beanObject().attributes().getOrDefault("MeanRate", 0);
return (double) beanObject().attributes().getOrDefault("MeanRate", 0.0);
}

default double oneMinuteRate() {
return (double) beanObject().attributes().getOrDefault("OneMinuteRate", 0);
return (double) beanObject().attributes().getOrDefault("OneMinuteRate", 0.0);
}

default double fiveMinuteRate() {
return (double) beanObject().attributes().getOrDefault("FiveMinuteRate", 0);
return (double) beanObject().attributes().getOrDefault("FiveMinuteRate", 0.0);
}

default double fifteenMinuteRate() {
return (double) beanObject().attributes().getOrDefault("FifteenMinuteRate", 0);
return (double) beanObject().attributes().getOrDefault("FifteenMinuteRate", 0.0);
}

default TimeUnit rateUnit() {
Expand Down
Loading

0 comments on commit 94f5a29

Please sign in to comment.