diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 1ae6434ade..048a38753c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -185,6 +185,7 @@ protected Boolean channelReady() { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // Get a buffer that contains the full frame ByteBuf buffer = (ByteBuf) msg; + requestStats.getNetworkTotalBytesIn().add(buffer.readableBytes()); // Update parse request latency metrics final BiConsumer registerRequestParseLatency = (timeBeforeParse, throwable) -> { @@ -405,7 +406,7 @@ protected void writeAndFlushResponseToClient(Channel channel) { if (responseFuture.isCompletedExceptionally()) { responseFuture.exceptionally(e -> { log.error("[{}] request {} completed exceptionally", channel, request.getHeader(), e); - channel.writeAndFlush(request.createErrorResponse(e)); + sendErrorResponse(request, channel, e); requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY) .registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS); @@ -421,7 +422,7 @@ protected void writeAndFlushResponseToClient(Channel channel) { // It should not be null, just check it for safety log.error("[{}] Unexpected null completed future for request {}", ctx.channel(), request.getHeader()); - channel.writeAndFlush(request.createErrorResponse(new ApiException("response is null"))); + sendErrorResponse(request, channel, new ApiException("response is null")); return; } if (log.isDebugEnabled()) { @@ -432,12 +433,15 @@ protected void writeAndFlushResponseToClient(Channel channel) { } final ByteBuf result = responseToByteBuf(response, request); + final int resultSize = result.readableBytes(); channel.writeAndFlush(result).addListener(future -> { if (response instanceof ResponseCallbackWrapper) { ((ResponseCallbackWrapper) response).responseComplete(); } if (!future.isSuccess()) { log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause()); + } else { + requestStats.getNetworkTotalBytesOut().add(resultSize); } }); requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY) @@ -451,14 +455,23 @@ protected void writeAndFlushResponseToClient(Channel channel) { log.error("[{}] request {} is not completed for {} ns (> {} ms)", channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs()); responseFuture.cancel(true); - channel.writeAndFlush( - request.createErrorResponse(new ApiException("request is expired from server side"))); + sendErrorResponse(request, channel, new ApiException("request is expired from server side")); requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY) .registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS); } } } + private void sendErrorResponse(KafkaHeaderAndRequest request, Channel channel, Throwable customError) { + ByteBuf result = request.createErrorResponse(customError); + final int resultSize = result.readableBytes(); + channel.writeAndFlush(result).addListener(future -> { + if (future.isSuccess()) { + requestStats.getNetworkTotalBytesOut().add(resultSize); + } + }); + } + protected abstract boolean hasAuthenticated(); protected abstract void channelPrepare(ChannelHandlerContext ctx, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopServerStats.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopServerStats.java index 06cca6ecc5..31d6557e34 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopServerStats.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopServerStats.java @@ -92,4 +92,10 @@ public interface KopServerStats { String KOP_EVENT_QUEUE_SIZE = "KOP_EVENT_QUEUE_SIZE"; String KOP_EVENT_QUEUED_LATENCY = "KOP_EVENT_QUEUED_LATENCY"; String KOP_EVENT_LATENCY = "KOP_EVENT_LATENCY"; + + /** + * Network stats. + */ + String NETWORK_TOTAL_BYTES_IN = "NETWORK_TOTAL_BYTES_IN"; + String NETWORK_TOTAL_BYTES_OUT = "NETWORK_TOTAL_BYTES_OUT"; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java index 9825cd09e5..13b8bfc7a2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java @@ -21,6 +21,8 @@ import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_PUBLISH; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_QUEUED_LATENCY; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_READ; +import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_IN; +import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_OUT; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PREPARE_METADATA; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_ENCODE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.REQUEST_PARSE_LATENCY; @@ -121,6 +123,18 @@ public class RequestStats { ) private final OpStatsLogger fetchDecodeStats; + @StatsDoc( + name = NETWORK_TOTAL_BYTES_IN, + help = "total bytes received" + ) + private final Counter networkTotalBytesIn; + + @StatsDoc( + name = NETWORK_TOTAL_BYTES_OUT, + help = "total bytes sent out" + ) + private final Counter networkTotalBytesOut; + private final Map apiKeysToStatsLogger = new ConcurrentHashMap<>(); public RequestStats(StatsLogger statsLogger) { @@ -138,6 +152,8 @@ public RequestStats(StatsLogger statsLogger) { this.prepareMetadataStats = statsLogger.getOpStatsLogger(PREPARE_METADATA); this.messageReadStats = statsLogger.getOpStatsLogger(MESSAGE_READ); this.fetchDecodeStats = statsLogger.getOpStatsLogger(FETCH_DECODE); + this.networkTotalBytesIn = statsLogger.getCounter(NETWORK_TOTAL_BYTES_IN); + this.networkTotalBytesOut = statsLogger.getCounter(NETWORK_TOTAL_BYTES_OUT); statsLogger.registerGauge(REQUEST_QUEUE_SIZE, new Gauge() { @Override diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index cd9769eb2e..faae8b915d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -1093,6 +1093,28 @@ public void testMaxMessageSize() throws PulsarAdminException { assertTrue(causeException instanceof RecordTooLargeException); } + @Test + public void testNetworkMetrics() throws Exception { + String topicName = "testNetworkMetrics"; + + // create partitioned topic. + admin.topics().createPartitionedTopic(topicName, 1); + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + getKafkaBrokerPort()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + @Cleanup + KafkaProducer producer = new KafkaProducer<>(props); + producer.send(new ProducerRecord<>(topicName, "key", "value")).get(); + + KafkaProtocolHandler protocolHandler = getProtocolHandler(); + long bytesIn = protocolHandler.getRequestStats().getNetworkTotalBytesIn().get(); + long bytesOut = protocolHandler.getRequestStats().getNetworkTotalBytesOut().get(); + + assertTrue(bytesIn > 0); + assertTrue(bytesOut > 0); + } + @DataProvider(name = "allowAutoTopicCreation") public static Object[][] allowAutoTopicCreation() { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java index d5d9a5fff2..f31d2cd65a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java @@ -230,6 +230,10 @@ public void testMetricsProvider() throws Exception { Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS{partition=\"0\"," + "topic=\"kopKafkaProducePulsarMetrics1\"} 10")); Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS")); + + // network stats + Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_IN")); + Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_OUT")); } @Test(timeOut = 20000)