Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Add basic IN/OUT bytes metrics (#1187)
Browse files Browse the repository at this point in the history
### Motivation

We need a way to measure the network traffic originating directly from KOP clients

### Modifications

Add two simple metrics KOP_NETWORK_BYTES_IN and KOP_NETWORK_BYTES_OUT.

Please note that this metrics are useful to track user usages of resources.
Especially on the Consumer side, Kafka model is a pull model so even if a topic is empty, an active Consumer is continues to send FETCH requests (and this is a network cost)

(cherry picked from commit b6be650)
  • Loading branch information
eolivelli authored and BewareMyPower committed May 20, 2022
1 parent 9331ad4 commit bc88ed6
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected Boolean channelReady() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
requestStats.getNetworkTotalBytesIn().add(buffer.readableBytes());

// Update parse request latency metrics
final BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> {
Expand Down Expand Up @@ -405,7 +406,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
if (responseFuture.isCompletedExceptionally()) {
responseFuture.exceptionally(e -> {
log.error("[{}] request {} completed exceptionally", channel, request.getHeader(), e);
channel.writeAndFlush(request.createErrorResponse(e));
sendErrorResponse(request, channel, e);

requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
.registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
Expand All @@ -421,7 +422,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
// It should not be null, just check it for safety
log.error("[{}] Unexpected null completed future for request {}",
ctx.channel(), request.getHeader());
channel.writeAndFlush(request.createErrorResponse(new ApiException("response is null")));
sendErrorResponse(request, channel, new ApiException("response is null"));
return;
}
if (log.isDebugEnabled()) {
Expand All @@ -432,12 +433,15 @@ protected void writeAndFlushResponseToClient(Channel channel) {
}

final ByteBuf result = responseToByteBuf(response, request);
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
((ResponseCallbackWrapper) response).responseComplete();
}
if (!future.isSuccess()) {
log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
} else {
requestStats.getNetworkTotalBytesOut().add(resultSize);
}
});
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
Expand All @@ -451,14 +455,23 @@ protected void writeAndFlushResponseToClient(Channel channel) {
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
responseFuture.cancel(true);
channel.writeAndFlush(
request.createErrorResponse(new ApiException("request is expired from server side")));
sendErrorResponse(request, channel, new ApiException("request is expired from server side"));
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
.registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
}
}
}

private void sendErrorResponse(KafkaHeaderAndRequest request, Channel channel, Throwable customError) {
ByteBuf result = request.createErrorResponse(customError);
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
requestStats.getNetworkTotalBytesOut().add(resultSize);
}
});
}

protected abstract boolean hasAuthenticated();

protected abstract void channelPrepare(ChannelHandlerContext ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ public interface KopServerStats {
String KOP_EVENT_QUEUE_SIZE = "KOP_EVENT_QUEUE_SIZE";
String KOP_EVENT_QUEUED_LATENCY = "KOP_EVENT_QUEUED_LATENCY";
String KOP_EVENT_LATENCY = "KOP_EVENT_LATENCY";

/**
* Network stats.
*/
String NETWORK_TOTAL_BYTES_IN = "NETWORK_TOTAL_BYTES_IN";
String NETWORK_TOTAL_BYTES_OUT = "NETWORK_TOTAL_BYTES_OUT";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_PUBLISH;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_QUEUED_LATENCY;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_READ;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_IN;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_OUT;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PREPARE_METADATA;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_ENCODE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.REQUEST_PARSE_LATENCY;
Expand Down Expand Up @@ -121,6 +123,18 @@ public class RequestStats {
)
private final OpStatsLogger fetchDecodeStats;

@StatsDoc(
name = NETWORK_TOTAL_BYTES_IN,
help = "total bytes received"
)
private final Counter networkTotalBytesIn;

@StatsDoc(
name = NETWORK_TOTAL_BYTES_OUT,
help = "total bytes sent out"
)
private final Counter networkTotalBytesOut;

private final Map<ApiKeys, StatsLogger> apiKeysToStatsLogger = new ConcurrentHashMap<>();

public RequestStats(StatsLogger statsLogger) {
Expand All @@ -138,6 +152,8 @@ public RequestStats(StatsLogger statsLogger) {
this.prepareMetadataStats = statsLogger.getOpStatsLogger(PREPARE_METADATA);
this.messageReadStats = statsLogger.getOpStatsLogger(MESSAGE_READ);
this.fetchDecodeStats = statsLogger.getOpStatsLogger(FETCH_DECODE);
this.networkTotalBytesIn = statsLogger.getCounter(NETWORK_TOTAL_BYTES_IN);
this.networkTotalBytesOut = statsLogger.getCounter(NETWORK_TOTAL_BYTES_OUT);

statsLogger.registerGauge(REQUEST_QUEUE_SIZE, new Gauge<Number>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,28 @@ public void testMaxMessageSize() throws PulsarAdminException {
assertTrue(causeException instanceof RecordTooLargeException);
}

@Test
public void testNetworkMetrics() throws Exception {
String topicName = "testNetworkMetrics";

// create partitioned topic.
admin.topics().createPartitionedTopic(topicName, 1);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + getKafkaBrokerPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
@Cleanup
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>(topicName, "key", "value")).get();

KafkaProtocolHandler protocolHandler = getProtocolHandler();
long bytesIn = protocolHandler.getRequestStats().getNetworkTotalBytesIn().get();
long bytesOut = protocolHandler.getRequestStats().getNetworkTotalBytesOut().get();

assertTrue(bytesIn > 0);
assertTrue(bytesOut > 0);
}


@DataProvider(name = "allowAutoTopicCreation")
public static Object[][] allowAutoTopicCreation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void testMetricsProvider() throws Exception {
Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS{partition=\"0\","
+ "topic=\"kopKafkaProducePulsarMetrics1\"} 10"));
Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS"));

// network stats
Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_IN"));
Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_OUT"));
}

@Test(timeOut = 20000)
Expand Down

0 comments on commit bc88ed6

Please sign in to comment.