Skip to content

Commit

Permalink
Add flat for interactive use case
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Dec 18, 2015
1 parent fc6f2fd commit 4ebbb12
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ trait OffsetCache extends Logging {
.build(
new CacheLoader[String,Future[PartitionOffsetsCapture]] {
def load(topic: String): Future[PartitionOffsetsCapture] = {
if (loadOffsets)
loadPartitionOffsets(topic)
else
emptyPartitionOffsetsCapture
loadPartitionOffsets(topic)
}
}
)
Expand Down Expand Up @@ -143,7 +140,7 @@ trait OffsetCache extends Logging {

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]]

protected def getTopicDescription(topic: String) : Option[TopicDescription]
protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription]

protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long]

Expand Down Expand Up @@ -198,7 +195,13 @@ trait OffsetCache extends Logging {

def stop()

def getTopicPartitionOffsets(topic: String) : Future[PartitionOffsetsCapture] = partitionOffsetsCache.get(topic)
def getTopicPartitionOffsets(topic: String, interactive: Boolean) : Future[PartitionOffsetsCapture] = {
if(interactive || loadOffsets) {
partitionOffsetsCache.get(topic)
} else {
emptyPartitionOffsetsCapture
}
}

def lastUpdateMillis : Long

Expand All @@ -209,14 +212,14 @@ trait OffsetCache extends Logging {
}

val topicDescriptions: Map[String, ConsumedTopicDescription] = consumerTopics.map { topic =>
val topicDesc = getConsumedTopicDescription(consumer, topic)
val topicDesc = getConsumedTopicDescription(consumer, topic, false)
(topic, topicDesc)
}.toMap
ConsumerDescription(consumer, topicDescriptions)
}

final def getConsumedTopicDescription(consumer:String, topic:String) : ConsumedTopicDescription = {
val optTopic = getTopicDescription(topic)
final def getConsumedTopicDescription(consumer:String, topic:String, interactive: Boolean) : ConsumedTopicDescription = {
val optTopic = getTopicDescription(topic, interactive)
val optTpi = optTopic.map(TopicIdentity.getTopicPartitionIdentity(_, None))
val partitionOffsets = for {
td <- optTopic
Expand All @@ -242,7 +245,7 @@ trait OffsetCache extends Logging {
case class OffsetCacheActive(curator: CuratorFramework,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: String => Option[TopicDescription],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
Expand Down Expand Up @@ -279,7 +282,7 @@ case class OffsetCacheActive(curator: CuratorFramework,

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)

protected def getTopicDescription(topic: String) : Option[TopicDescription] = topicDescriptions(topic)
protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)

def start(): Unit = {
info("Starting consumers tree cache...")
Expand Down Expand Up @@ -370,7 +373,7 @@ case class OffsetCacheActive(curator: CuratorFramework,
case class OffsetCachePassive(curator: CuratorFramework,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: String => Option[TopicDescription],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
Expand Down Expand Up @@ -407,7 +410,7 @@ case class OffsetCachePassive(curator: CuratorFramework,

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)

protected def getTopicDescription(topic: String) : Option[TopicDescription] = topicDescriptions(topic)
protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)

def start(): Unit = {
info("Starting consumers path children cache...")
Expand Down Expand Up @@ -664,7 +667,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
Option(topicsTreeCache.getCurrentData(topicPath)).map( childData => (childData.getStat.getVersion,asString(childData.getData)))
}

def getTopicDescription(topic: String) : Option[TopicDescription] = {
def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = {
for {
description <- getTopicZookeeperData(topic)
partitionsPath = "%s/%s/partitions".format(ZkUtils.BrokerTopicsPath, topic)
Expand All @@ -673,7 +676,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
val statePath = s"$partitionsPath/$part/state"
Option(topicsTreeCache.getCurrentData(statePath)).map(cd => (part, asString(cd.getData)))
}
partitionOffsets = offsetCache.getTopicPartitionOffsets(topic)
partitionOffsets = offsetCache.getTopicPartitionOffsets(topic, interactive)
topicConfig = getTopicConfigString(topic)
} yield TopicDescription(topic, description, Option(states), partitionOffsets, topicConfig)
}
Expand Down Expand Up @@ -768,10 +771,10 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
sender ! TopicConfig(topic, getTopicConfigString(topic))

case KSGetTopicDescription(topic) =>
sender ! getTopicDescription(topic)
sender ! getTopicDescription(topic, false)

case KSGetTopicDescriptions(topics) =>
sender ! TopicDescriptions(topics.toIndexedSeq.flatMap(getTopicDescription), topicsTreeCacheLastUpdateMillis)
sender ! TopicDescriptions(topics.toIndexedSeq.flatMap(getTopicDescription(_, false)), topicsTreeCacheLastUpdateMillis)

case KSGetConsumerDescription(consumer) =>
asyncPipeToSender {
Expand All @@ -780,7 +783,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom

case KSGetConsumedTopicDescription(consumer, topic) =>
asyncPipeToSender {
offsetCache.getConsumedTopicDescription(consumer, topic)
offsetCache.getConsumedTopicDescription(consumer, topic, true)
}

case KSGetAllTopicDescriptions(lastUpdateMillisOption) =>
Expand All @@ -793,7 +796,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}.fold {
sender ! TopicDescriptions(IndexedSeq.empty, topicsTreeCacheLastUpdateMillis)
} { data: java.util.Map[String, ChildData] =>
sender ! TopicDescriptions(data.asScala.keys.toIndexedSeq.flatMap(getTopicDescription), topicsTreeCacheLastUpdateMillis)
sender ! TopicDescriptions(data.asScala.keys.toIndexedSeq.flatMap(getTopicDescription(_, false)), topicsTreeCacheLastUpdateMillis)
}
} // else no updates to send

Expand Down

0 comments on commit 4ebbb12

Please sign in to comment.