Skip to content

Commit

Permalink
MINOR: ZkMetadataCache should be in kafka.server.metadata apache#10956
Browse files Browse the repository at this point in the history
Put ZkMetadataCache in the kafka.server.metadata package rather than the kafka.server package, so
that its package is consistent with its position in the source directory hierarchy.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
dengziming authored and cmccabe committed Oct 26, 2021
1 parent 82d5e1c commit 53f5c26
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 19 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.network.{RequestChannel, SocketServer}
import kafka.security.CredentialProvider
import kafka.server.metadata.ZkConfigRepository
import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package kafka.server

import kafka.admin.BrokerMetadata

import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}

import java.util

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/MetadataSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import kafka.controller.KafkaController
import kafka.network.RequestChannel
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.requests.AbstractResponse

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock

import com.yammer.metrics.core.Meter
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition}
Expand All @@ -34,6 +33,7 @@ import kafka.server.{FetchMetadata => SFetchMetadata}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
* limitations under the License.
*/

package kafka.server
package kafka.server.metadata

import java.util
import java.util.Collections
import java.util.concurrent.locks.ReentrantReadWriteLock

import kafka.admin.BrokerMetadata

import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
import kafka.controller.StateChangeLogger
import kafka.server.MetadataCache
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import kafka.utils.Implicits._
Expand Down
161 changes: 161 additions & 0 deletions core/src/test/scala/unit/kafka/server/KRaftMetadataTest.scala.tmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metadata.MetadataRecordType.{FEATURE_LEVEL_RECORD, REGISTER_BROKER_RECORD}
import org.apache.kafka.common.metadata.{FeatureLevelRecord, RegisterBrokerRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.metadata.{FeatureMapAndEpoch, VersionRange}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 3, controllers = 3)
@Tag("integration")
class KRaftMetadataTest(cluster: ClusterInstance) {

var brokerServers: Seq[BrokerServer] = _
var controllerServers: Seq[ControllerServer] = _
var activeController: ControllerServer = _
var epoch: Int = _

def updateFinalizedVersion(apiVersionAndMessages: List[ApiMessageAndVersion]): Unit = {
val offset = updateMetadata(apiVersionAndMessages)
brokerServers.foreach(s => {
s.featureCache.waitUntilEpochOrThrow(offset, s.config.zkConnectionTimeoutMs)
})
TestUtils.waitUntilTrue(
() => try {
activeController.controller.finalizedFeatures().get() // .map().features()
true
} catch {
case _: Throwable => false
},
"Controller did not get broker updates"
)
}

def updateSupportedVersion(features: Features[SupportedVersionRange],
targetServers: Seq[BrokerServer]): Unit = {
targetServers.foreach(brokerServer => {
TestUtils.waitUntilTrue(() => brokerServer.lifecycleManager.brokerEpoch != -1, "broker registration failed")
brokerServer.brokerFeatures.setSupportedFeatures(features)
updateMetadata(List(toApiMessageAndVersion(features, brokerServer)))
})

val brokerRegistrations = activeController.controller.asInstanceOf[QuorumController].brokerRegistrations()
brokerRegistrations.asScala.foreach { case (_, value) =>
TestUtils.waitUntilTrue(
() => value.supportedFeatures().asScala == toVersionRanges(features),
"Controller did not get broker updates"
)
}
}

def toVersionRanges(features: Features[SupportedVersionRange]): Map[String, VersionRange] = {
features.features().asScala.map { case (key, value) =>
(key, new VersionRange(value.min(), value.max()))
}.toMap
}

def toApiMessageAndVersion(features: Features[SupportedVersionRange],
brokerServer: BrokerServer): ApiMessageAndVersion = {
val networkListeners = new ListenerCollection()
brokerServer.config.advertisedListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(ep.host).
setName(ep.listenerName.value()).
setPort(ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}

val featureCollection = new RegisterBrokerRecord.BrokerFeatureCollection()
features.features().asScala.foreach{ feature =>
featureCollection.add(new RegisterBrokerRecord.BrokerFeature()
.setName(feature._1)
.setMinSupportedVersion(feature._2.min())
.setMaxSupportedVersion(feature._2.max()))
}
new ApiMessageAndVersion(
new RegisterBrokerRecord()
.setBrokerId(brokerServer.config.nodeId)
.setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection())
.setBrokerEpoch(brokerServer.lifecycleManager.brokerEpoch)
.setFeatures(featureCollection),
REGISTER_BROKER_RECORD.highestSupportedVersion()
)
}

def updateMetadata(apiVersionAndMessages: List[ApiMessageAndVersion]): Long = {
// Append to controller
val offset = activeController.controller.asInstanceOf[QuorumController].updateMetadata(apiVersionAndMessages.asJava)
// Wait raft response
offset.get()
}

def getFeatureMetadataData(): FeatureMapAndEpoch =
activeController.controller.finalizedFeatures().get()

@ClusterTest
def testUpdateFinalizedVersion(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
activeController = raftCluster.activeController()
brokerServers = raftCluster.brokerServers().asScala.toSeq
controllerServers = raftCluster.controllerServers().asScala.toSeq
epoch = activeController.controller.curClaimEpoch()

updateFinalizedVersion(
List(
new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName("feature")
.setMinFeatureLevel(1)
.setMaxFeatureLevel(2),
FEATURE_LEVEL_RECORD.highestSupportedVersion()
)
)
)

println(getFeatureMetadataData())
}

@ClusterTest
def testUpdateSupportedVersion(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
activeController = raftCluster.activeController()
brokerServers = raftCluster.brokerServers().asScala.toSeq
controllerServers = raftCluster.controllerServers().asScala.toSeq
epoch = activeController.controller.curClaimEpoch()

updateSupportedVersion(
Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)))),
brokerServers
)
}

}
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.log.AppendOrigin
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository}
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package kafka.server

import org.apache.kafka.common.{Node, TopicPartition, Uuid}

import java.util
import util.Arrays.asList

import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
Expand All @@ -30,10 +30,10 @@ import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.util.Collections

import java.util.Collections
import kafka.api.LeaderAndIsr
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.common.metadata.{PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
package kafka.server

import java.util.{Collections, Optional}

import kafka.api.Request
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{UnifiedLog, LogManager}
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.UpdateMetadataRequestData
import org.apache.kafka.common.protocol.{Errors, ApiKeys}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package kafka.server

import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{UnifiedLog, LogAppendInfo, LogManager}
import kafka.log.{LogAppendInfo, LogManager, UnifiedLog}
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{FetchResponseData, UpdateMetadataRequestData}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ZkMetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.KafkaScheduler;
import kafka.utils.MockTime;
import kafka.utils.Pool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.ZkMetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
Expand All @@ -41,6 +40,7 @@
import kafka.server.ZkSupport;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import kafka.server.LogDirFailureChannel;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ZkMetadataCache;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils;
Expand Down

0 comments on commit 53f5c26

Please sign in to comment.