Skip to content

Commit

Permalink
Merge pull request #316 from EddDoubleD/topics-partition-consumer-stats
Browse files Browse the repository at this point in the history
feat: add record statistics of partitions and topics from proto
  • Loading branch information
pnv1 authored Sep 19, 2024
2 parents 4ff4686 + 1b01046 commit 33f70b1
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,9 @@ public Builder setBytesRead(MultipleWindowsStat bytesRead) {
this.bytesRead = bytesRead;
return this;
}

public ConsumerStats build() {
return new ConsumerStats(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,9 @@ public Builder setPartitionNodeId(int partitionNodeId) {
this.partitionNodeId = partitionNodeId;
return this;
}

public PartitionStats build() {
return new PartitionStats(this);
}
}
}
53 changes: 49 additions & 4 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.ConsumerStats;
import tech.ydb.topic.description.MeteringMode;
import tech.ydb.topic.description.MultipleWindowsStat;
import tech.ydb.topic.description.PartitionInfo;
import tech.ydb.topic.description.PartitionStats;
import tech.ydb.topic.description.SupportedCodecs;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.impl.AsyncReaderImpl;
import tech.ydb.topic.read.impl.OffsetsRangeImpl;
import tech.ydb.topic.read.impl.SyncReaderImpl;
import tech.ydb.topic.settings.AlterConsumerSettings;
import tech.ydb.topic.settings.AlterPartitioningSettings;
Expand Down Expand Up @@ -260,8 +264,9 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
.setPartitionId(partition.getPartitionId())
.setActive(partition.getActive())
.setChildPartitionIds(partition.getChildPartitionIdsList())
.setParentPartitionIds(partition.getParentPartitionIdsList());
// TODO: read partition stats
.setParentPartitionIds(partition.getParentPartitionIdsList())
.setPartitionStats(fromProto(partition.getPartitionStats()));

partitions.add(partitionBuilder.build());
}
description.setPartitions(partitions);
Expand All @@ -285,8 +290,7 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
consumerSupportedCodecsBuilder.addCodec(codecFromProto(codec));
}
consumerBuilder.setSupportedCodecs(consumerSupportedCodecsBuilder.build());

// TODO: set consumer stats
consumerBuilder.setStats(fromProto(consumer.getConsumerStats()));

consumers.add(consumerBuilder.build());
}
Expand Down Expand Up @@ -399,6 +403,47 @@ private static YdbTopic.SupportedCodecs toProto(SupportedCodecs supportedCodecs)
return codecsBuilder.build();
}

private static PartitionStats fromProto(YdbTopic.PartitionStats partitionStats) {
return PartitionStats.newBuilder()
.setPartitionOffsets(
new OffsetsRangeImpl(
partitionStats.getPartitionOffsets().getStart(),
partitionStats.getPartitionOffsets().getEnd()
)
).setStoreSizeBytes(
partitionStats.getStoreSizeBytes()
).setLastWriteTime(
ProtobufUtils.protoToInstant(partitionStats.getLastWriteTime())
).setMaxWriteTimeLag(
ProtobufUtils.protoToDuration(partitionStats.getMaxWriteTimeLag())
).setBytesWritten(
new MultipleWindowsStat(
partitionStats.getBytesWritten().getPerMinute(),
partitionStats.getBytesWritten().getPerHour(),
partitionStats.getBytesWritten().getPerDay()
)
).setPartitionNodeId(
partitionStats.getPartitionNodeId()
).build();
}

private static ConsumerStats fromProto(YdbTopic.Consumer.ConsumerStats consumerStats) {
return ConsumerStats.newBuilder()
.setMinPartitionsLastReadTime(
ProtobufUtils.protoToInstant(consumerStats.getMinPartitionsLastReadTime())
).setMaxReadTimeLag(
ProtobufUtils.protoToDuration(consumerStats.getMaxReadTimeLag())
).setMaxWriteTimeLag(
ProtobufUtils.protoToDuration(consumerStats.getMaxWriteTimeLag())
).setBytesRead(
new MultipleWindowsStat(
consumerStats.getBytesRead().getPerMinute(),
consumerStats.getBytesRead().getPerHour(),
consumerStats.getBytesRead().getPerDay()
)
).build();
}

@Override
public void close() {
logger.debug("TopicClientImpl.close() is called");
Expand Down

0 comments on commit 33f70b1

Please sign in to comment.