Kafka 集群由一系列的 broker 节点构成,在这些 broker 节点中会选举一个节点成为所有 broker 节点的 leader(称之为 kafka controller),其余的 broker 节点均为 follower 角色。Kafka Controller 负责管理集群中所有 topic 分区和副本的状态,协调集群中所有 broker 节点的运行,同时也负责 Kafka 与 ZK 之间的交互,下文中如果不特殊说明,Kafka Controller 均指代 leader 角色。
KafkaController 组件的定义与启动
Kafka 定义了 KafkaController 类来描述 Kafka Controller,KafkaController 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class KafkaController (val config: KafkaConfig , // 配置信息 zkUtils: ZkUtils , // ZK 交互工具类 val brokerState: BrokerState , // 描述 broker 节点的状态 time: Time , // 时间戳工具类 metrics: Metrics , threadNamePrefix: Option [String ] = None ) extends Logging with KafkaMetricsGroup { private var isRunning = true val controllerContext = new ControllerContext (zkUtils) val partitionStateMachine = new PartitionStateMachine (this ) val replicaStateMachine = new ReplicaStateMachine (this ) private val controllerElector = new ZookeeperLeaderElector ( controllerContext, ZkUtils .ControllerPath , onControllerFailover, onControllerResignation, config.brokerId, time) private val autoRebalanceScheduler = new KafkaScheduler (1 ) var deleteTopicManager: TopicDeletionManager = _ val offlinePartitionSelector = new OfflinePartitionLeaderSelector (controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector (controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector (controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector (controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch (this ) private val partitionReassignedListener = new PartitionsReassignedListener (this ) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener (this ) private val isrChangeNotificationListener = new IsrChangeNotificationListener (this ) }
在 Kafka 服务启动时,每个 broker 节点都会创建对应的 Kafka Controller 实例,并调用 KafkaController#startup
方法启动运行:
1 2 3 4 5 6 7 8 9 10 11 12 def startup (): Unit = { inLock(controllerContext.controllerLock) { info("Controller starting up" ) this .registerSessionExpirationListener() isRunning = true controllerElector.startup info("Controller startup complete" ) } }
启动过程中会注册 SessionExpirationListener 监听器监听 Kafka Controller 与 ZK 之间的连接状态,并启动故障转移机制,在初始启动时可以借助该机制为集群选择 leader 角色。这里先了解一下启动的流程,关于 ZK 监听机制和故障转移机制,留到下面的小节中针对性分析。
上下文信息管理
ControllerContext 类用于管理 Kafka Controller 的上下文信息,并提供与集群中所有 broker 之间建立连接并通信的功能。ControllerContext 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class ControllerContext (val zkUtils: ZkUtils ) { var controllerChannelManager: ControllerChannelManager = _ var shuttingDownBrokerIds: mutable.Set [Int ] = mutable.Set .empty var epoch: Int = KafkaController .InitialControllerEpoch - 1 var epochZkVersion: Int = KafkaController .InitialControllerEpochZkVersion - 1 var allTopics: Set [String ] = Set .empty var partitionReplicaAssignment: mutable.Map [TopicAndPartition , Seq [Int ]] = mutable.Map .empty var partitionLeadershipInfo: mutable.Map [TopicAndPartition , LeaderIsrAndControllerEpoch ] = mutable.Map .empty val partitionsBeingReassigned: mutable.Map [TopicAndPartition , ReassignedPartitionsContext ] = new mutable.HashMap val partitionsUndergoingPreferredReplicaElection: mutable.Set [TopicAndPartition ] = new mutable.HashSet private var liveBrokersUnderlying: Set [Broker ] = Set .empty private var liveBrokerIdsUnderlying: Set [Int ] = Set .empty }
ControllerContext 类提供了对这些字段管理的方法,实现比较简单,不展开分析。
下面我们重点看一下 ControllerChannelManager 类定义,该类用于建立到集群中所有 broker 节点的连接,并与之通信。ControllerChannelManager 类定义了 ControllerChannelManager#brokerStateInfo
字段,用于记录到对应 broker 节点的通讯相关信息。ControllerChannelManager 类在被实例化时会调用 ControllerChannelManager#addNewBroker
方法初始化 ControllerChannelManager#brokerStateInfo
字段,为每个可用的 broker 节点构造一个 ControllerBrokerStateInfo 对象,其中封装了目标 broker 节点信息、网络客户端对象、缓存请求的队列,以及请求发送线程对象,ControllerBrokerStateInfo 样例类定义如下:
1 2 3 4 case class ControllerBrokerStateInfo (networkClient: NetworkClient , brokerNode: Node , messageQueue: BlockingQueue [QueueItem ], requestSendThread: RequestSendThread )
方法 ControllerChannelManager#addNewBroker
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private def addNewBroker (broker: Broker ) { val messageQueue = new LinkedBlockingQueue [QueueItem ] debug("Controller %d trying to connect to broker %d" .format(config.brokerId, broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) val brokerNode = new Node (broker.id, brokerEndPoint.host, brokerEndPoint.port) val networkClient = { val channelBuilder = ChannelBuilders .clientChannelBuilder( config.interBrokerSecurityProtocol, LoginType .SERVER , config.values, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) val selector = new Selector ( NetworkReceive .UNLIMITED , Selector .NO_IDLE_TIMEOUT_MS , metrics, time, "controller-channel" , Map ("broker-id" -> broker.id.toString).asJava, false , channelBuilder ) new NetworkClient ( selector, new ManualMetadataUpdater (Seq (brokerNode).asJava), config.brokerId.toString, 1 , 0 , Selectable .USE_DEFAULT_BUFFER_SIZE , Selectable .USE_DEFAULT_BUFFER_SIZE , config.requestTimeoutMs, time, false ) } val threadName = threadNamePrefix match { case None => "Controller-%d-to-broker-%d-send-thread" .format(config.brokerId, broker.id) case Some (name) => "%s:Controller-%d-to-broker-%d-send-thread" .format(name, config.brokerId, broker.id) } val requestThread = new RequestSendThread ( config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, threadName) requestThread.setDaemon(false ) brokerStateInfo.put(broker.id, ControllerBrokerStateInfo (networkClient, brokerNode, messageQueue, requestThread)) }
RequestSendThread 类继承自 ShutdownableThread 抽象类,所以我们重点来看一下 RequestSendThread#doWork
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 override def doWork (): Unit = { def backoff (): Unit = CoreUtils .swallowTrace(Thread .sleep(100 )) val QueueItem (apiKey, requestBuilder, callback) = queue.take() import NetworkClientBlockingOps ._ var clientResponse: ClientResponse = null try { lock synchronized { var isSendSuccessful = false while (isRunning.get() && !isSendSuccessful) { try { if (!brokerReady()) { isSendSuccessful = false backoff() } else { val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true ) clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time) isSendSuccessful = true } } catch { } } if (clientResponse != null ) { val api = ApiKeys .forId(clientResponse.requestHeader.apiKey) if (api != ApiKeys .LEADER_AND_ISR && api != ApiKeys .STOP_REPLICA && api != ApiKeys .UPDATE_METADATA_KEY ) throw new KafkaException (s"Unexpected apiKey received: $apiKey " ) val response = clientResponse.responseBody if (callback != null ) { callback(response) } } } } catch { } }
RequestSendThread 线程在运行期间会循环消费存放请求的阻塞队列,队列元素的类型为 QueueItem,其中封装了具体的请求类型、请求对象,以及响应回调函数。如果存在待发送的请求对象则会向目标 broker 节点发送请求,并阻塞等待响应,当拿到响应对象之后调用响应函数进行处理。ControllerChannelManager 的发送请求函数 ControllerChannelManager#sendRequest
本质上就是将请求信息封装成 QueueItem 对象,并记录到请求阻塞队列中,然后交由 RequestSendThread 线程异步处理。
批量发送请求
ControllerBrokerRequestBatch 类实现了向所有可用 broker 节点批量发送 LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest 请求的功能,它定义了 3 个集合分别缓存这 3 类请求的相关信息,并提供了相关方法用于添加待发送的请求信息,以及批量发送请求。ControllerBrokerRequestBatch 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ControllerBrokerRequestBatch (controller: KafkaController ) extends Logging { val controllerContext: ControllerContext = controller.controllerContext val controllerId: Int = controller.config.brokerId val leaderAndIsrRequestMap = mutable.Map .empty[Int , mutable.Map [TopicPartition , PartitionStateInfo ]] val stopReplicaRequestMap = mutable.Map .empty[Int , Seq [StopReplicaRequestInfo ]] val updateMetadataRequestBrokerSet = mutable.Set .empty[Int ] val updateMetadataRequestPartitionInfoMap = mutable.Map .empty[TopicPartition , PartitionStateInfo ] }
ControllerBrokerRequestBatch 提供了 ControllerBrokerRequestBatch#newBatch
方法用于校验缓存相应请求信息的 3 个集合是否为空,如果不为空则抛出异常,表示此时还存在未发送完成的请求,不允许添加新的待发送请求对象。同时,也提供了 ControllerBrokerRequestBatch#clear
方法用于清空这 3 个集合。
下面列出的 3 个方法分别用于往对应集合中添加相应的请求信息:
ControllerBrokerRequestBatch#addLeaderAndIsrRequestForBrokers
:用于往 leaderAndIsrRequestMap 集合中添加待发送的 LeaderAndIsrRequest 请求所需的数据,并构造发往所有可用 broker 节点的 UpdateMetadataRequest 请求,缓存到 updateMetadataRequestPartitionInfoMap 集合中等待发送。
ControllerBrokerRequestBatch#addStopReplicaRequestForBrokers
:用于往 stopReplicaRequestMap 集合中添加待发送的 StopReplicaRequest 请求所需的数据。
ControllerBrokerRequestBatch#addUpdateMetadataRequestForBrokers
:用于往 updateMetadataRequestPartitionInfoMap 集合中添加 UpdateMetadataRequest 请求所需的数据。
方法 ControllerBrokerRequestBatch#sendRequestsToBrokers
会遍历处理这 3 个集合,并调用 KafkaController#sendRequest
方法发送请求,底层还是依赖于前面分析过的 RequestSendThread 线程执行异步请求操作。
ControllerBrokerRequestBatch 类中定义的方法在实现上都比较直观,这里不展开分析。
分区状态管理
PartitionStateMachine 定义了 Kafka Controller 的分区状态机,用于管理集群中分区的状态信息,每个 Kafka Controller 都定义了自己的分区状态机,但只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。分区状态机使用 PartitionState 特质定义分区的状态,同时提供了多个样例对象实现,分别表示不同的分区状态。分区状态样例对象说明:
NewPartition :新创建出来的分区对应的状态,此时分区可能已经被分配了 AR 集合,但是还没有分配 ISR 集合和 leader 副本。
OnlinePartition :当一个分区选举出 leader 副本之后,该分区即处于此状态。
OfflinePartition :如果一个分区的 leader 副本失效,则切换成此状态。
NonExistentPartition :描述一个不存在的分区,或者之前存在但是现在已经被删除了。
分区状态转换图如下:
PartitionStateMachine 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class PartitionStateMachine (controller: KafkaController ) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId private val zkUtils = controllerContext.zkUtils private val partitionState: mutable.Map [TopicAndPartition , PartitionState ] = mutable.Map .empty private val brokerRequestBatch = new ControllerBrokerRequestBatch (controller) private val hasStarted = new AtomicBoolean (false ) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector (controllerContext) private val topicChangeListener = new TopicChangeListener (controller) private val deleteTopicsListener = new DeleteTopicsListener (controller) private val partitionModificationsListeners: mutable.Map [String , PartitionModificationsListener ] = mutable.Map .empty }
当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 PartitionStateMachine#startup
方法启动对应的分区状态机,该方法实现如下:
1 2 3 4 5 6 7 8 def startup () { this .initializePartitionState() hasStarted.set(true ) this .triggerOnlinePartitionStateChange() }
分区状态机使用 PartitionStateMachine#partitionState
字段记录集群中所有可用分区的状态,在启动时会初始化该字段,即初始化每个 topic 分区对应的状态信息,尝试将所有 OfflinePartition 或 NewPartition 状态的可用分区切换成 OnlinePartition 状态。
方法 PartitionStateMachine#initializePartitionState
会遍历集群中所有的 topic 分区,并尝试获取分区对应的 leader 副本和 ISR 集合等信息,如果这些信息不存在则将对应分区初始化为 NewPartition 状态。否则,校验分区 leader 副本所在的 broker 节点是否可用,如果可用则将对应分区初始化为 OnlinePartition 状态,如果不可用则将对应分区初始化为 OfflinePartition 状态。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private def initializePartitionState () { for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) { controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some (currentLeaderIsrAndEpoch) => if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader)) partitionState.put(topicPartition, OnlinePartition ) else partitionState.put(topicPartition, OfflinePartition ) case None => partitionState.put(topicPartition, NewPartition ) } } }
方法 PartitionStateMachine#triggerOnlinePartitionStateChange
会遍历所有可用的分区(不包含那些待删除的 topic 名下的分区),并尝试对状态为 OfflinePartition 或 NewPartition 的分区执行状态切换,切换成 OnlinePartition 状态。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def triggerOnlinePartitionStateChange () { try { brokerRequestBatch.newBatch() for ((topicAndPartition, partitionState) <- partitionState if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) { if (partitionState.equals(OfflinePartition ) || partitionState.equals(NewPartition )) this .handleStateChange( topicAndPartition.topic, topicAndPartition.partition, OnlinePartition , controller.offlinePartitionSelector, (new CallbackBuilder ).build) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) } catch { } }
而具体执行分区状态切换的操作则交由 PartitionStateMachine#handleStateChange
方法完成,关于该方法的实现将在接下来的小节中进行分析。
分区状态切换
分区状态机定义了 PartitionStateMachine#handleStateChanges
方法用于将指定的 topic 分区集合中的分区状态切换成指定的目标状态,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def handleStateChanges (partitions: Set [TopicAndPartition ], targetState: PartitionState , leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, callbacks: Callbacks = (new CallbackBuilder ).build) { info("Invoking state change to %s for partitions %s" .format(targetState, partitions.mkString("," ))) try { brokerRequestBatch.newBatch() partitions.foreach { topicAndPartition => this .handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) } catch { } }
其中核心实现在于 PartitionStateMachine#handleStateChange
方法,前面分析分区状态机启动过程时也提到了该方法,下面一起来看一下该方法的具体实现(省略了日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private def handleStateChange (topic: String , partition: Int , targetState: PartitionState , leaderSelector: PartitionLeaderSelector , callbacks: Callbacks ) { val topicAndPartition = TopicAndPartition (topic, partition) if (!hasStarted.get) throw new StateChangeFailedException (("Controller %d epoch %d initiated state change for partition %s to %s failed because " + "the partition state machine has not started" ).format(controllerId, controller.epoch, topicAndPartition, targetState)) val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition ) try { targetState match { case NewPartition => this .assertValidPreviousStates(topicAndPartition, List (NonExistentPartition ), NewPartition ) partitionState.put(topicAndPartition, NewPartition ) case OnlinePartition => this .assertValidPreviousStates(topicAndPartition, List (NewPartition , OnlinePartition , OfflinePartition ), OnlinePartition ) partitionState(topicAndPartition) match { case NewPartition => this .initializeLeaderAndIsrForPartition(topicAndPartition) case OfflinePartition => this .electLeaderForPartition(topic, partition, leaderSelector) case OnlinePartition => this .electLeaderForPartition(topic, partition, leaderSelector) case _ => } partitionState.put(topicAndPartition, OnlinePartition ) case OfflinePartition => this .assertValidPreviousStates(topicAndPartition, List (NewPartition , OnlinePartition , OfflinePartition ), OfflinePartition ) partitionState.put(topicAndPartition, OfflinePartition ) case NonExistentPartition => this .assertValidPreviousStates(topicAndPartition, List (OfflinePartition ), NonExistentPartition ) partitionState.put(topicAndPartition, NonExistentPartition ) } } catch { } }
分区状态切换的整体实现思路是依据切换的目标状态对当前分区状态执行校验,保证当前分区状态属于合法的目标切换状态的前置状态。方法 PartitionStateMachine#assertValidPreviousStates
实现了前置状态的校验,如果前置状态不合法则会抛出异常。
如果目标切换状态是 NewPartition、OfflinePartition 和 NonExistentPartition 中的一个,则切换的过程比较简单。下面主要来看一下目标状态为 OnlinePartition 的分区状态切换,按照前置状态分为 3 种场景:
如果前置分区状态为 NewPartition,则需要为对应 topic 分区分配 leader 副本和 ISR 集合。
如果前置分区状态为 OfflinePartition,则需要为对应 topic 分区选举新的 leader 副本。
如果前置分区状态为 OnlinePartition,则需要为对应 topic 分区重新选举新的 leader 副本。
场景 2 和 3 具体由 PartitionStateMachine#electLeaderForPartition
方法实现,我们将在稍后分析分区 leader 副本选举机制时介绍该方法,这里先来看一下场景 1,具体由 PartitionStateMachine#initializeLeaderAndIsrForPartition
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private def initializeLeaderAndIsrForPartition (topicAndPartition: TopicAndPartition ) { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) liveAssignedReplicas.size match { case 0 => val failMsg = "encountered error during state change of partition %s from New to Online, assigned replicas are [%s], live brokers are [%s]. No assigned replica is alive." .format(topicAndPartition, replicaAssignment.mkString("," ), controllerContext.liveBrokerIds) stateChangeLogger.error("Controller %d epoch %d " .format(controllerId, controller.epoch) + failMsg) throw new StateChangeFailedException (failMsg) case _ => debug("Live assigned replicas for partition %s are: [%s]" .format(topicAndPartition, liveAssignedReplicas)) val leader = liveAssignedReplicas.head val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch (new LeaderAndIsr (leader, liveAssignedReplicas.toList), controller.epoch) debug("Initializing leader and isr for partition %s to %s" .format(topicAndPartition, leaderIsrAndControllerEpoch)) try { zkUtils.createPersistentPath( getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch) brokerRequestBatch.addLeaderAndIsrRequestForBrokers( liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment) } catch { } } }
对于 NewPartition 状态的 topic 分区而言,会从该分区可用的副本中选举第 1 个副本作为 leader 副本,并将所有可用的副本添加到 ISR 集合中,然后将这些信息记录到 ZK 中,同时向对应 broker 节点发送 LeaderAndIsrRequest 请求,以执行分区副本角色切换。
分区 leader 副本选举
分区状态机定义了 PartitionStateMachine#electLeaderForPartition
方法,基于给定的分区 leader 副本选择器对指定 topic 分区执行 leader 副本选择操作。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def electLeaderForPartition (topic: String , partition: Int , leaderSelector: PartitionLeaderSelector ) { val topicAndPartition = TopicAndPartition (topic, partition) try { var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null var replicasForThisPartition: Seq [Int ] = Seq .empty[Int ] while (!zookeeperPathUpdateSucceeded) { val currentLeaderIsrAndEpoch = this .getLeaderIsrAndEpochOrThrowException(topic, partition) val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch if (controllerEpoch > controller.epoch) { } val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ReplicationUtils .updateLeaderAndIsr( zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded replicasForThisPartition = replicas } val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch (newLeaderAndIsr, controller.epoch) controllerContext.partitionLeadershipInfo.put(TopicAndPartition (topic, partition), newLeaderIsrAndControllerEpoch) val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition (topic, partition)) brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, replicas) } catch { } debug("After leader election, leader cache is updated to %s" .format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2)))) }
在执行分区 leader 副本选举时会基于给定的分区 leader 副本选择器为对应 topic 分区选择新的 leader 副本,并返回新的 ISR 集合,因为对应分区的状态信息发生了变更,所以需要将更新后的分区状态更新到 ZK,并向集群中所有可用的 broker 节点发送 LeaderAndIsrRequest 请求,通知对应 broker 节点执行分区副本角色切换,并更新本地缓存的集群元数据信息。
PartitionLeaderSelector 特质抽象定义了分区 leader 副本选择器,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 trait PartitionLeaderSelector { def selectLeader (topicAndPartition: TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) }
Kafka 目前围绕 PartitionLeaderSelector 特质定义了 5 种分区 leader 副本选择策略:
NoOpLeaderSelector
OfflinePartitionLeaderSelector
ReassignedPartitionLeaderSelector
PreferredReplicaPartitionLeaderSelector
ControlledShutdownLeaderSelector
其中 NoOpLeaderSelector 在实现上最简单,它实际上并没有做什么事情,只是将参数传递的目标分区当前 leader 副本和 ISR 集合作为结果直接返回,下面来具体分析一下剩余 4 种分区 leader 选择策略。
OfflinePartitionLeaderSelector
OfflinePartitionLeaderSelector 分区 leader 副本选择器会尝试从 ISR 集合中选择新的 leader 副本,如果 ISR 集合中不存在可用的副本,则在配置允许的情况下尝试从 AR 集合中选择新的 leader 副本。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def selectLeader (topicAndPartition: TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some (assignedReplicas) => val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = if (liveBrokersInIsr.isEmpty) { if (!LogConfig .fromProps(config.originals, AdminUtils .fetchEntityConfig( controllerContext.zkUtils, ConfigType .Topic , topicAndPartition.topic)).uncleanLeaderElectionEnable) { } debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString("," ))) if (liveAssignedReplicas.isEmpty) { } else { ControllerStats .uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString("," ))) new LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , List (newLeader), currentLeaderIsrZkPathVersion + 1 ) } } else { val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) val newLeader = liveReplicasInIsr.head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString("," ))) new LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , liveBrokersInIsr, currentLeaderIsrZkPathVersion + 1 ) } info("Selected new leader and ISR %s for offline partition %s" .format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => throw new NoReplicaOnlineException ("Partition %s doesn't have replicas assigned to it" .format(topicAndPartition)) } }
如果是从 ISR 集合中选择新的 leader 副本,则以 ISR 集合中所有可用的副本作为新的 ISR 集合。如果是从 AR 集合中选择新的 leader 副本,则需要配置 unclean.leader.election.enable=true
,这种情况下 ISR 集合只包含 leader 副本。
ReassignedPartitionLeaderSelector
ReassignedPartitionLeaderSelector 分区 leader 副本选择器在副本重新分配的前提下会选择既位于新分配的 AR 集合中,同时又位于 ISR 集合中的副本作为新的 leader 副本,并以之前的 ISR 集合作为新的 ISR 集合。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def selectLeader (topicAndPartition: TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = { val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val aliveReassignedInSyncReplicas = reassignedInSyncReplicas .filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)) val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match { case Some (newLeader) => (new LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1 ), reassignedInSyncReplicas) case None => reassignedInSyncReplicas.size match { case 0 => throw new NoReplicaOnlineException ("List of reassigned replicas for partition %s is empty. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new NoReplicaOnlineException ("None of the reassigned replicas for partition %s are in-sync with the leader. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr)) } } }
由上述实现可以看出,如果不存在满足条件的副本则会抛出异常。
PreferredReplicaPartitionLeaderSelector
PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器尝试选择优先副本(AR 集合中的第一个副本)作为 leader 副本,前提是该副本必须位于 ISR 集合中,并且以当前 ISR 集合作为新的 ISR 集合。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def selectLeader (topicAndPartition: TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) { throw new LeaderElectionNotNeededException ("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition)) } else { info("Current leader %d for partition %s is not the preferred replica." .format(currentLeader, topicAndPartition) + " Triggering preferred replica leader election" ) if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { (new LeaderAndIsr (preferredReplica, currentLeaderAndIsr.leaderEpoch + 1 , currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1 ), assignedReplicas) } else { throw new StateChangeFailedException ("Preferred replica %d for partition " .format(preferredReplica) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr)) } } }
由上述实现可以看出,如果优先副本所在 broker 节点不可用,或者优先副本不位于 ISR 集合中,则会抛出异常。
ControlledShutdownLeaderSelector
ControlledShutdownLeaderSelector 分区 leader 副本选择器会尝试将副本所在 broker 节点正在关闭的副本从 ISR 集合中移除,并将 ISR 集合中剩下的副本作为新的 ISR 集合,同时从中选择一个副本作为新的 leader 副本。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def selectLeader (topicAndPartition: TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = { val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val currentLeader = currentLeaderAndIsr.leader val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) liveAssignedReplicas.find(newIsr.contains) match { case Some (newLeader) => debug("Partition %s : current leader = %d, new leader = %d" .format(topicAndPartition, currentLeader, newLeader)) (LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , newIsr, currentLeaderIsrZkPathVersion + 1 ), liveAssignedReplicas) case None => throw new StateChangeFailedException ("No other replicas in ISR %s for %s besides shutting down brokers %s" .format(currentLeaderAndIsr.isr.mkString("," ), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString("," ))) } }
副本状态管理
ReplicaStateMachine 定义了 Kafka Controller 的副本状态机,用于管理集群中副本的状态信息,每个 Kafka Controller 都定义了自己的副本状态机,但是只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。副本状态机使用 ReplicaState 特质定义副本的状态,同时提供了多个样例对象实现,分别表示不同的副本状态。副本状态样例对象说明:
NewReplica :新创建出来的副本对应的状态,处于该状态的副本只能是 follower 副本。
OnlineReplica :当副本成为 AR 集合中的一员即位于该状态,此时副本既可以是 leader 角色,也可以是 follower 角色。
OfflineReplica :当副本所在的 broker 节点宕机后,副本所对应的状态。
ReplicaDeletionStarted :当开始删除副本时,会先将副本切换成该状态,然后开始执行删除操作。
ReplicaDeletionSuccessful :当副本被成功删除后对应的状态。
ReplicaDeletionIneligible :当副本删除失败后对应的状态。
NonExistentReplica :一个被成功删除的副本,最终将切换成该状态。
ReplicaStateMachine 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class ReplicaStateMachine (controller: KafkaController ) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId private val zkUtils = controllerContext.zkUtils private val replicaState: mutable.Map [PartitionAndReplica , ReplicaState ] = mutable.Map .empty private val brokerChangeListener = new BrokerChangeListener (controller) private val brokerRequestBatch = new ControllerBrokerRequestBatch (controller) private val hasStarted = new AtomicBoolean (false ) }
当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 ReplicaStateMachine#startup
方法启动对应的副本状态机,该方法实现如下:
1 2 3 4 5 6 7 8 def startup () { this .initializeReplicaState() hasStarted.set(true ) this .handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica ) }
副本状态机使用 ReplicaStateMachine#replicaState
字段记录集群中所有副本的状态,在启动时会初始化该字段,即初始化每个副本的状态信息,尝试将所有可用的副本状态切换成 OnlineReplica 状态,而将所有不可用的副本切换成 ReplicaDeletionIneligible 状态。
方法 ReplicaStateMachine#initializeReplicaState
会遍历处理每个 topic 分区的 AR 集合,并依据副本所在 broker 节点是否可用对本地记录的副本状态执行初始化操作,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private def initializeReplicaState () { for ((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) { val topic = topicPartition.topic val partition = topicPartition.partition assignedReplicas.foreach { replicaId => val partitionAndReplica = PartitionAndReplica (topic, partition, replicaId) if (controllerContext.liveBrokerIds.contains(replicaId)) replicaState.put(partitionAndReplica, OnlineReplica ) else replicaState.put(partitionAndReplica, ReplicaDeletionIneligible ) } } }
方法 ReplicaStateMachine#handleStateChanges
会遍历所有可用的副本,并尝试将对应副本的状态切换成 OnlineReplica 状态,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def handleStateChanges (replicas: Set [PartitionAndReplica ], targetState: ReplicaState , callbacks: Callbacks = (new CallbackBuilder ).build) { if (replicas.nonEmpty) { info("Invoking state change to %s for replicas %s" .format(targetState, replicas.mkString("," ))) try { brokerRequestBatch.newBatch() replicas.foreach(r => this .handleStateChange(r, targetState, callbacks)) brokerRequestBatch.sendRequestsToBrokers(controller.epoch) } catch { case e: Throwable => error("Error while moving some replicas to %s state" .format(targetState), e) } } }
具体的状态切换交由 ReplicaStateMachine#handleStateChange
方法实现,这也是 ReplicaStateMachine 中定义的最核心的方法,实现如下(省略了部分日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 def handleStateChange (partitionAndReplica: PartitionAndReplica , targetState: ReplicaState , callbacks: Callbacks ) { val topic = partitionAndReplica.topic val partition = partitionAndReplica.partition val replicaId = partitionAndReplica.replica val topicAndPartition = TopicAndPartition (topic, partition) if (!hasStarted.get) throw new StateChangeFailedException ( "Controller %d epoch %d initiated state change of replica %d for partition %s to %s failed because replica state machine has not started" .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica ) try { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) targetState match { case NewReplica => this .assertValidPreviousStates(partitionAndReplica, List (NonExistentReplica ), targetState) val leaderIsrAndControllerEpochOpt = ReplicationUtils .getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { case Some (leaderIsrAndControllerEpoch) => if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) throw new StateChangeFailedException ("Replica %d for partition %s cannot be moved to NewReplica" .format(replicaId, topicAndPartition) + "state as it is being requested to become leader" ) brokerRequestBatch.addLeaderAndIsrRequestForBrokers( List (replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) case None => } replicaState.put(partitionAndReplica, NewReplica ) case ReplicaDeletionStarted => this .assertValidPreviousStates(partitionAndReplica, List (OfflineReplica ), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionStarted ) brokerRequestBatch.addStopReplicaRequestForBrokers( List (replicaId), topic, partition, deletePartition = true , callbacks.stopReplicaResponseCallback) case ReplicaDeletionIneligible => this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionStarted ), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionIneligible ) case ReplicaDeletionSuccessful => this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionStarted ), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful ) case NonExistentReplica => this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionSuccessful ), targetState) val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) replicaState.remove(partitionAndReplica) case OnlineReplica => this .assertValidPreviousStates(partitionAndReplica, List (NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ), targetState) replicaState(partitionAndReplica) match { case NewReplica => val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if (!currentAssignedReplicas.contains(replicaId)) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) case _ => controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some (leaderIsrAndControllerEpoch) => brokerRequestBatch.addLeaderAndIsrRequestForBrokers( List (replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) replicaState.put(partitionAndReplica, OnlineReplica ) case None => } } replicaState.put(partitionAndReplica, OnlineReplica ) case OfflineReplica => this .assertValidPreviousStates(partitionAndReplica, List (NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ), targetState) brokerRequestBatch.addStopReplicaRequestForBrokers(List (replicaId), topic, partition, deletePartition = false ) val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some (_) => controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some (updatedLeaderIsrAndControllerEpoch) => val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) } replicaState.put(partitionAndReplica, OfflineReplica ) false case None => true } case None => true } if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) throw new StateChangeFailedException ( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) } } catch { } }
同前面分析过的分区状态切换 PartitionStateMachine#handleStateChange
方法类似,副本状态的整体实现思路同样是依据切换的目标状态对当前副本状态执行校验,保证当前副本状态属于合法的目标切换状态的前置状态。方法 ReplicaStateMachine#assertValidPreviousStates
实现了前置状态的校验,如果前置状态不合法则会抛出异常。具体的副本状态切换逻辑如上述方法中的代码注释,思想上类似前面介绍的分区状态切换的实现,这里不再展开分析。
Topic 删除机制
TopicDeletionManager 负责对管理员指定的 topic 执行删除操作,它定义了 DeleteTopicsThread 线程,采用异步的方式删除待删除的 topic 集合。TopicDeletionManager 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class TopicDeletionManager (controller: KafkaController , initialTopicsToBeDeleted: Set [String ] = Set .empty, initialTopicsIneligibleForDeletion: Set [String ] = Set .empty ) extends Logging { val controllerContext: ControllerContext = controller.controllerContext val partitionStateMachine: PartitionStateMachine = controller.partitionStateMachine val replicaStateMachine: ReplicaStateMachine = controller.replicaStateMachine val deleteTopicsCond: Condition = deleteLock.newCondition() val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean (false ) var deleteTopicsThread: DeleteTopicsThread = _ val isDeleteTopicEnabled: lang.Boolean = controller.config.deleteTopicEnable val topicsToBeDeleted: mutable.Set [String ] = if (isDeleteTopicEnabled) { mutable.Set .empty[String ] ++ initialTopicsToBeDeleted } else { val zkUtils = controllerContext.zkUtils for (topic <- initialTopicsToBeDeleted) { val deleteTopicPath = getDeleteTopicPath(topic) info("Removing " + deleteTopicPath + " since delete topic is disabled" ) zkUtils.zkClient.delete(deleteTopicPath) } mutable.Set .empty[String ] } val topicsIneligibleForDeletion: mutable.Set [String ] = mutable.Set .empty[String ] ++ (initialTopicsIneligibleForDeletion & topicsToBeDeleted) val partitionsToBeDeleted: mutable.Set [TopicAndPartition ] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic) }
Kafka 提供了 delete.topic.enable
配置项,用于配置是否启用 topic 删除机制,如果未开启则不会真正执行删除操作,而是将指定的待删除 topic 信息从 /admin/delete_topics
节点下移除。另外,由上面的字段定义可以看到 TopicDeletionManager 还管理了不可删除的 topic 集合,当一个 topic 满足以下条件之一时,我们认为暂时不能对其执行删除操作:
Topic 的某个分区正在执行副本重新分配。
Topic 的某个分区正在执行优先副本选举。
Topic 的某个副本不可用,即所在的 broker 节点宕机。
当 Kafka Controller 实例成为 leader 角色时会调用 TopicDeletionManager#start
方法启动 topic 删除机制,该方法主要用于启动后台删除线程 DeleteTopicsThread:
1 2 3 4 5 6 7 8 9 def start () { if (isDeleteTopicEnabled) { deleteTopicsThread = new DeleteTopicsThread () if (topicsToBeDeleted.nonEmpty) deleteTopicStateChanged.set(true ) deleteTopicsThread.start() } }
DeleteTopicsThread 继承自 ShutdownableThread 抽象类,其 DeleteTopicsThread#doWork
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 override def doWork () { awaitTopicDeletionNotification() if (!isRunning.get) return inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set .empty[String ] ++ topicsToBeDeleted topicsQueuedForDeletion.foreach { topic => if (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { completeDeleteTopic(topic) info("Deletion of topic %s successfully completed" .format(topic)) } else { if (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) { } else { if (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible )) { markTopicForDeletionRetry(topic) } } } if (isTopicEligibleForDeletion(topic)) { info("Deletion of topic %s (re)started" .format(topic)) onTopicDeletion(Set (topic)) } else if (isTopicIneligibleForDeletion(topic)) { info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion" .format(topic)) } } } }
Topic 删除的执行流程可以概括为:
获取待删除的 topic 集合;
如果 topic 的所有副本都已经成功被删除,则变更 topic 及其分区和副本的状态,并从 ZK 和 Controller 上下文中移除 topic 相关信息;
否则,如果 topic 存在任一副本处于删除准备的状态(ReplicaDeletionStarted),则跳过当前 topic 继续处理其它 topic;
否则,如果 topic 存在任一副本处于删除失败的状态(ReplicaDeletionIneligible),则尝试将对应副本状态重置为 OfflineReplica,等待后续删除重试;
检测当前 topic 是否可以被删除,如果可以则开始执行删除操作。
下面来重点分析一下步骤 2 和 5。 步骤 2 用于对已经成功被删除的 topic 执行一些后置清理工作,包括注销 ZK 监听器、切换分区和副本的状态,以及从 ZK 和 Controller 上下文中清除 topic 相关的数据等,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private def completeDeleteTopic (topic: String ) { partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful ) replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica ) val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition ) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition ) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) val zkUtils = controllerContext.zkUtils zkUtils.zkClient.deleteRecursive(getTopicPath(topic)) zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType .Topic , topic)) zkUtils.zkClient.delete(getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) }
步骤 5 负责向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,通知这些节点相关 topic 需要被删除,并对 topic 名下分区的 AR 集合执行删除操作。一个 topic 可以执行删除需要满足以下 2 个条件:
Topic 待删除,且还未开始进行删除操作。
Topic 未标记为不可删除。
对于同时满足上述条件的 topic 会调用 TopicDeletionManager#onTopicDeletion
方法执行删除操作:
1 2 3 4 5 6 7 8 9 10 11 12 private def onTopicDeletion (topics: Set [String ]) { info("Topic deletion callback for %s" .format(topics.mkString("," ))) val partitions = topics.flatMap(controllerContext.partitionsForTopic) controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => this .onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet) } }
方法 TopicDeletionManager#onPartitionDeletion
只是简单获取了对应 topic 分区的 AR 集合,并调用 TopicDeletionManager#startReplicaDeletion
方法对这些副本执行删除操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private def startReplicaDeletion (replicasForTopicsToBeDeleted: Set [PartitionAndReplica ]) { replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic => val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic) val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful ) val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible ) replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica ) debug("Deletion started for replicas %s" .format(replicasForDeletionRetry.mkString("," ))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted , new Callbacks .CallbackBuilder ().stopReplicaCallback(deleteTopicStopReplicaCallback).build) if (deadReplicasForTopic.nonEmpty) { debug("Dead Replicas (%s) found for topic %s" .format(deadReplicasForTopic.mkString("," ), topic)) markTopicIneligibleForDeletion(Set (topic)) } } } private def deleteTopicStopReplicaCallback (stopReplicaResponseObj: AbstractResponse , replicaId: Int ) { val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse ] debug("Delete topic callback invoked for %s" .format(stopReplicaResponse)) val responseMap = stopReplicaResponse.responses.asScala val partitionsInError = if (stopReplicaResponse.errorCode != Errors .NONE .code) responseMap.keySet else responseMap.filter { case (_, error) => error != Errors .NONE .code }.keySet val replicasInError = partitionsInError.map(p => PartitionAndReplica (p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { this .failReplicaDeletion(replicasInError) if (replicasInError.size != responseMap.size) { val deletedReplicas = responseMap.keySet -- partitionsInError this .completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica (p.topic, p.partition, replicaId))) } } }
对于删除失败的副本会将其状态切换成 ReplicaDeletionIneligible,并唤醒删除线程再次尝试删除;对于删除成功的副本则将其状态置为 ReplicaDeletionSuccessful。如果一个待删除 topic 所有的副本状态均为 ReplicaDeletionSuccessful,则 DeleteTopicsThread 线程会对该 topic 执行后置清理工作,即我们前面分析的步骤 2。
副本再分配机制
Kafka Controller 提供了分区副本再分配机制,用于为指定的 topic 分区重新分配副本。当一个 Kafka Controller 实例竞选成为 leader 角色,或者管理员手动指定需要为某些 topic 分区重新分配副本时会触发该机制。这里我们以 Kafka Controller 实例竞选成为 leader 角色触发分区副本再分配的场景为例进行说明,关于管理员手动触发的场景留到后面分析 ZK 监听机制时再进行分析,实际上二者只是入口不同,具体的执行流程还是一样的。
我们从 KafkaController#maybeTriggerPartitionReassignment
方法开始说起,当 Kafka Controller 实例竞选成为 leader 角色时会触发执行该方法。在 Controller 的上下文中定义了 ControllerContext#partitionsBeingReassigned
字段,用于记录需要和正在执行副本再分配操作的 topic 分区。而 KafkaController#maybeTriggerPartitionReassignment
方法只是简单了遍历了该字段,并调用 KafkaController#initiateReassignReplicasForTopicPartition
方法为每个需要执行副本再分配的 topic 分区执行再分配操作。
在开始分析相关实现之前,我们需要明确 2 个概念:RAR 和 OAR,其中 RAR 表示分区新分配的 AR 集合,OAR 表示分区之前的 AR 集合。下面开始分析方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def initiateReassignReplicasForTopicPartition (topicAndPartition: TopicAndPartition , reassignedPartitionContext: ReassignedPartitionsContext ) { val newReplicas = reassignedPartitionContext.newReplicas val topic = topicAndPartition.topic val partition = topicAndPartition.partition try { val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) assignedReplicasOpt match { case Some (assignedReplicas) => if (assignedReplicas == newReplicas) { throw new KafkaException ("Partition %s to be reassigned is already assigned to replicas" .format(topicAndPartition) + " %s. Ignoring request for partition reassignment" .format(newReplicas.mkString("," ))) } else { info("Handling reassignment of partition %s to new replicas %s" .format(topicAndPartition, newReplicas.mkString("," ))) this .watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) deleteTopicManager.markTopicIneligibleForDeletion(Set (topic)) this .onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } case None => throw new KafkaException ("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition)) } } catch { } }
如果新分配的副本集合较当前的 AR 集合有变更,则会触发执行再分配操作,在开始操作之前,需要为对应 topic 分区注册一个 ReassignedPartitionsIsrChangeListener 监听器,并标记分区所属的 topic 不可被删除。ReassignedPartitionsIsrChangeListener 用于监听当前分区 ISR 集合的变化,具体实现我们留到后面的小节中针对性分析,下面重点来看一下 KafkaController#onPartitionReassignment
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def onPartitionReassignment (topicAndPartition: TopicAndPartition , reassignedPartitionContext: ReassignedPartitionsContext ) { val reassignedReplicas = reassignedPartitionContext.newReplicas if (!this .areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) { info("New replicas %s for partition %s being " .format(reassignedReplicas.mkString("," ), topicAndPartition) + "reassigned not yet caught up with the leader" ) val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet this .updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) this .updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq) this .startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info("Waiting for new replicas %s for partition %s being " .format(reassignedReplicas.mkString("," ), topicAndPartition) + "reassigned to catch up with the leader" ) } else { val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set (PartitionAndReplica (topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica ) } this .moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) this .stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) this .updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) this .removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper" .format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set (topicAndPartition)) deleteTopicManager.resumeDeletionForTopics(Set (topicAndPartition.topic)) } }
如果 RAR 中存在一个或多个副本不在 ISR 集合中,则再分配的执行流程如下:
以 OAR + RAR 中全部副本集合作为对应 topic 分区的 AR 集合,更新到 ZK 和 Controller 上下文中;
向 OAR + RAR 中全部副本所在的 broker 节点发送 LeaderAndIsrRequest 请求,更新对应节点缓存的分区 leader 副本的年代信息;
切换新增的副本(RAR - OAR)状态为 NewReplica。
这一场景下再分配操作并未执行完成,实际上大部分再分配操作新分配的 RAR 集合中都包含一个或多个不存在于 ISR 集合中的副本,所以上面的执行流程可以看做是副本再分配操作的前置流程。当这些新增的副本在运行一段时间之后,与 leader 副本进行同步,并逐一加入到 ISR 集合之后会触发 ReassignedPartitionsIsrChangeListener 监听器,回调执行后续的流程(即 RAR 中的副本全部存在于 ISR 集合中)。
如果 RAR 中的副本均包含在对应分区的 ISR 集合中,则再分配的执行流程如下:
切换 RAR 中所有副本的状态为 OnlineReplica;
使用 RAR 集合更新对应 topic 分区的 AR 集合,并在 leader 副本不在 RAR 集合中或所在的 broker 节点失效的情况下,基于 ReassignedPartitionLeaderSelector 分区 leader 副本选择器重新选择新的 leader 副本;
切换旧的副本(OAR - RAR)状态为 NonExistentReplica;
更新 ZK 中记录的对应分区的 AR 集合;
从 ZK 和 Controller 上下文中移除对应 topic 分区的副本再分配信息;
向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,更新副本再分配后的集群状态信息;
取消对应 topic 的不可删除标记(前面有标记为不可删除),并唤醒 DeleteTopicsThread 线程。
各步骤的方法实现都比较简单,这里不再继续深入。
ZK 监听机制
Kafka 与 ZK 的交互依赖于 zkclient 客户端,zkclient 定义了 3 种类型的监听器接口实现:IZkDataListener、IZkChildListener 和 IZkStateListener。其中 IZkDataListener 用于监听指定节点数据的变化,IZkChildListener 用于监听指定节点下子节点的变化,IZkStateListener 则用于监听 ZK 连接状态的变化。本小节我们重点关注与 Kafka Controller 相关的 ZK 监听器实现。
ZK 连接状态监听器
SessionExpirationListener 实现了 IZkStateListener 接口,用于监听 Kafka Controller 与 ZK 之间的连接状态。SessionExpirationListener 提供了 SessionExpirationListener#handleNewSession
方法实现,当与 ZK 建立新的连接会话时会触发回调该方法,尝试选举新的 leader 角色。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def handleNewSession () { info("ZK expired; shut down all controller components and try to re-elect" ) if (controllerElector.getControllerID != config.brokerId) { onControllerResignation() inLock(controllerContext.controllerLock) { controllerElector.elect } } else { info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect" .format(config.brokerId)) } }
当 broker 与 ZK 建立新的会话时,上述方法会检查当前 ZK 上记录的 leader 节点是否是当前实例所在的节点,如果不是的话则需要调用 KafkaController#onControllerResignation
执行一些状态清理工作(因为当前节点之前可能是 leader 角色),然后调用 ZookeeperLeaderElector#elect
方法基于 ZK 的临时节点机制尝试竞选成为新的 leader。关于 ZookeeperLeaderElector#elect
方法,我们在后面会专门分析,这里先来看一下 KafkaController#onControllerResignation
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def onControllerResignation () { debug("Controller resigning, broker id %d" .format(config.brokerId)) this .deregisterIsrChangeNotificationListener() this .deregisterReassignedPartitionsListener() this .deregisterPreferredReplicaElectionListener() if (deleteTopicManager != null ) deleteTopicManager.shutdown() if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() inLock(controllerContext.controllerLock) { this .deregisterReassignedPartitionsIsrChangeListeners() partitionStateMachine.shutdown() replicaStateMachine.shutdown() if (controllerContext.controllerChannelManager != null ) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } controllerContext.epoch = 0 controllerContext.epochZkVersion = 0 brokerState.newState(RunningAsBroker ) info("Broker %d resigned as the controller" .format(config.brokerId)) } }
上述方法会在 Kafka Controller 由 leader 角色降级为 follower 角色时被触发,具体的执行逻辑如代码注释。
ZK 节点状态监听器
TopicChangeListener
TopicChangeListener 实现了 IZkChildListener 接口,用于监听 /brokers/topics
节点,当有新的 topic 创建或者删除已有 topic 时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def doHandleChildChange (parentPath: String , children: Seq [String ]) { inLock(controllerContext.controllerLock) { if (hasStarted.get) { try { val currentChildren = { debug("Topic change listener fired for path %s with children %s" .format(parentPath, children.mkString("," ))) children.toSet } val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren controllerContext.allTopics = currentChildren val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]" .format(newTopics, deletedTopics, addedPartitionReplicaAssignment)) if (newTopics.nonEmpty) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) } catch { case e: Throwable => error("Error while handling new topic" , e) } } } }
上述方法基于 ZK 感知当前新增和已删除的 topic 集合,并更新本地记录的可用的 topic 集合,及其分区的 AR 集合信息。对于新增的 topic 集合,Kafka Controller 会调用 KafkaController#onNewTopicCreation
方法为每个 topic 注册一个 PartitionModificationsListener 监听器,同时切换对应 topic 新增分区及其副本的状态,使其能够上线运行。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def onNewTopicCreation (topics: Set [String ], newPartitions: Set [TopicAndPartition ]) { info("New topic creation callback for %s" .format(newPartitions.mkString("," ))) topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) this .onNewPartitionCreation(newPartitions) } def onNewPartitionCreation (newPartitions: Set [TopicAndPartition ]) { info("New partition creation callback for %s" .format(newPartitions.mkString("," ))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica ) partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition , offlinePartitionSelector) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica ) }
由上面的实现可以看出,分区和副本的状态并非是一次性切换成 Online 状态的,而是先切换成 New 状态,再切换成 Online 状态,期间会为分区分配 leader 副本和 ISR 集合,并尝试将新的副本添加到对应分区的 AR 集合中。
DeleteTopicsListener
DeleteTopicsListener 实现了 IZkChildListener 接口,用于监听 /admin/delete_topics
节点,当管理员指定要删除一些 topic 时,对应的 topic 会被写入到该 ZK 节点下,然后触发执行 DeleteTopicsListener 的回调方法 DeleteTopicsListener#doHandleChildChange
,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 def doHandleChildChange (parentPath: String , children: Seq [String ]) { inLock(controllerContext.controllerLock) { var topicsToBeDeleted = children.toSet debug("Delete topics listener fired for topics %s to be deleted" .format(topicsToBeDeleted.mkString("," ))) val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics if (nonExistentTopics.nonEmpty) { warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString("," )) nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic))) } topicsToBeDeleted --= nonExistentTopics if (controller.config.deleteTopicEnable) { if (topicsToBeDeleted.nonEmpty) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString("," )) topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if (preferredReplicaElectionInProgress || partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set (topic)) } controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) } } else { for (topic <- topicsToBeDeleted) { info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled" ) zkUtils.zkClient.delete(getDeleteTopicPath(topic)) } } } }
当检测到有新的 topic 需要被删除时,上述方法会获取需要被删除的 topic 集合,并判定对应的 topic 是否是有效的(是否是真实存在的),如果无效则直接将相关删除信息从 ZK 节点下移除,对于有效的 topic 集合,在配置(对应 delete.topic.enable
配置)允许的情况下会检查待删除的 topic 分区是否满足以下 2 个条件:
存在正在进行优先副本选举的分区。
存在正在进行副本重新分配的分区。
如果待删除分区满足上述 2 个条件之一则将其标记为不可删除,否则将对应的 topic 提交给 TopicDeletionManager 执行删除操作。TopicDeletionManager 会将待删除的 topic 及其分区集合添加到 TopicDeletionManager 定义的待删除集合中,并唤醒 DeleteTopicsThread 线程执行删除操作。关于 TopicDeletionManager 的运行机制可以参考前面小节的分析。
BrokerChangeListener
BrokerChangeListener 实现了IZkChildListener 接口,用于监听 /broker/ids
节点,当有 broker 节点上线或者下线时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def doHandleChildChange (parentPath: String , currentBrokerList: Seq [String ]) { info("Broker change listener fired for path %s with children %s" .format(parentPath, currentBrokerList.sorted.mkString("," ))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { ControllerStats .leaderElectionTimer.time { try { val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) controllerContext.liveBrokers = curBrokers val newBrokerIdsSorted = newBrokerIds.toSeq.sorted val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" .format(newBrokerIdsSorted.mkString("," ), deadBrokerIdsSorted.mkString("," ), liveBrokerIdsSorted.mkString("," ))) newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) if (newBrokerIds.nonEmpty) controller.onBrokerStartup(newBrokerIdsSorted) if (deadBrokerIds.nonEmpty) controller.onBrokerFailure(deadBrokerIdsSorted) } catch { case e: Throwable => error("Error while handling broker changes" , e) } } } } }
对于新上线的 broker 节点会触发 Kafka Controller 创建到这些节点的网络连接,并通知集群中所有可用的 broker 节点有新的 broker 节点上线,同时切换新增 broker 节点上的分区副本状态,以上线对外提供服务。相关实现位于 KafkaController#onBrokerStartup
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def onBrokerStartup (newBrokers: Seq [Int ]) { info("New broker startup callback for %s" .format(newBrokers.mkString("," ))) val newBrokersSet = newBrokers.toSet this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica ) partitionStateMachine.triggerOnlinePartitionStateChange() val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter { case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains) } partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2)) val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) if (replicasForTopicsToBeDeleted.nonEmpty) { info("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. Signaling restart of topic deletion for these topics" .format(replicasForTopicsToBeDeleted.mkString("," ), deleteTopicManager.topicsToBeDeleted.mkString("," ), newBrokers.mkString("," ))) deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } }
对于已经下线的 broker 节点会触发 Kafka Controller 关闭到这些节点的网络连接,并将分配给故障节点的副本置为 OfflineReplica 状态。如果某些分区的 leader 副本正好位于故障 broker 节点上,则需要将这些分区置为 OfflinePartition 状态,并通知到集群中所有可用的 broker 节点。相关实现位于 KafkaController#onBrokerFailure
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def onBrokerFailure (deadBrokers: Seq [Int ]) { info("Broker failure callback for %s" .format(deadBrokers.mkString("," ))) val deadBrokersThatWereShuttingDown = deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) info("Removed %s from list of shutting down brokers." .format(deadBrokersThatWereShuttingDown)) val deadBrokersSet = deadBrokers.toSet val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader => deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) && !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition ) partitionStateMachine.triggerOnlinePartitionStateChange() val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet) val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica ) val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) if (replicasForTopicsToBeDeleted.nonEmpty) { deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } if (partitionsWithoutLeader.isEmpty) { this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } }
IsrChangeNotificationListener
IsrChangeNotificationListener 实现了 IZkChildListener 接口,用于监听 /isr_change_notification
节点,当监听到某些分区的 ISR 集合发生变化时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def doHandleChildChange (parentPath: String , currentChildren: Seq [String ]): Unit = { inLock(controller.controllerContext.controllerLock) { debug("ISR change notification listener fired" ) try { val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet if (topicAndPartitions.nonEmpty) { controller.updateLeaderAndIsrCache(topicAndPartitions) processUpdateNotifications(topicAndPartitions) } } finally { currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(ZkUtils .IsrChangeNotificationPath + "/" + x)) } } } private def processUpdateNotifications (topicAndPartitions: immutable.Set [TopicAndPartition ]) { val liveBrokers: Seq [Int ] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions) controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions) }
相关逻辑如代码注释,比较简单。
ZK 数据状态监听器
LeaderChangeListener
LeaderChangeListener 实现了 IZkDataListener 接口,用于监听 /controller
节点,当节点数据发生变更或被删除时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def handleDataChange (dataPath: String , data: Object ) { val shouldResign = inLock(controllerContext.controllerLock) { val amILeaderBeforeDataChange = amILeader leaderId = KafkaController .parseControllerId(data.toString) info("New leader is %d" .format(leaderId)) amILeaderBeforeDataChange && !amILeader } if (shouldResign) onResigningAsLeader() } def handleDataDeleted (dataPath: String ) { val shouldResign = inLock(controllerContext.controllerLock) { debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" .format(brokerId, dataPath)) amILeader } if (shouldResign) onResigningAsLeader() inLock(controllerContext.controllerLock) { elect } }
具体的回调逻辑如代码注释,其中 ZookeeperLeaderElector#elect
方法的实现将留到后面的小节中进行分析,另外回调方法 ZookeeperLeaderElector#onResigningAsLeader
实际上就是 KafkaController#onControllerResignation
方法,这个在前面已经分析过,不再重复撰述。
PartitionModificationsListener
PartitionModificationsListener 实现了 IZkDataListener 接口,用于监听 /brokers/topics/{topic_name}
节点,当某个 topic 的分区发生变化时(即增加分区,因为分区数目只增不减),会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def doHandleDataChange (dataPath: String , data: AnyRef ) { inLock(controllerContext.controllerLock) { try { info(s"Partition modification triggered $data for path $dataPath " ) val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List (topic)) val partitionsToBeAdded = partitionReplicaAssignment .filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic)) error("Skipping adding partitions %s for topic %s since it is currently being deleted" .format(partitionsToBeAdded.map(_._1.partition).mkString("," ), topic)) else { if (partitionsToBeAdded.nonEmpty) { info("New partitions to be added %s" .format(partitionsToBeAdded)) controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded controller.onNewPartitionCreation(partitionsToBeAdded.keySet) } } } catch { case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e) } } }
上述回调方法针对正常运行的 topic,如果有新增分区则会切换这些新增分区和副本的状态,使这些分区和副本能够上线对外提供服务。其中 KafkaController#onNewTopicCreation
方法已在前面分析过,这里不再重复撰述。
PreferredReplicaElectionListener
PreferredReplicaElectionListener 实现了 IZkDataListener 接口,用于监听 /admin/preferred_replica_election
节点,为指定的 topic 分区选举优先副本作为 leader 副本,以保证集群中 leader 副本的均衡分布。相关回调方法实现如下(省略了日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 def doHandleDataChange (dataPath: String , data: AnyRef ) { inLock(controllerContext.controllerLock) { val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand .parsePreferredReplicaElectionData(data.toString) val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } }
当管理员手动指定某些 topic 分区需要执行优先副本选举时,相应的信息会被写入到 /admin/preferred_replica_election
节点下,然后触发执行上述回调方法。对于这些指定需要执行优先副本选举,且对应 topic 正常运行的分区,最终会调用 KafkaController#onPreferredReplicaElection
方法基于 PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器选举 leader 副本。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def onPreferredReplicaElection (partitions: Set [TopicAndPartition ], isTriggeredByAutoRebalance: Boolean = false ) { info("Starting preferred replica leader election for partitions %s" .format(partitions.mkString("," ))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic)) partitionStateMachine.handleStateChanges(partitions, OnlinePartition , preferredReplicaPartitionLeaderSelector) } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s" .format(partitions.mkString("," )), e) } finally { this .removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic)) } }
关于 PreferredReplicaPartitionLeaderSelector 的实现,在前面已经分析过,这里不再重复撰述。
PartitionsReassignedListener
PartitionsReassignedListener 实现了 IZkDataListener 接口,用于监听 /admin/reassign_partitions
节点,当管理员指定需要为某些 topic 重新分配副本时,相关信息会写入到该节点下,并触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def doHandleDataChange (dataPath: String , data: AnyRef ) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) val partitionsReassignmentData = ZkUtils .parsePartitionReassignmentData(data.toString) val partitionsToBeReassigned = inLock(controllerContext.controllerLock) { partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) } partitionsToBeReassigned.foreach { partitionToBeReassigned => inLock(controllerContext.controllerLock) { if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { error("Skipping reassignment of partition %s for topic %s since it is currently being deleted" .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic)) controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1) } else { val context = ReassignedPartitionsContext (partitionToBeReassigned._2) controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) } } } }
相关执行逻辑比较简单,如代码注释,其中 KafkaController#initiateReassignReplicasForTopicPartition
方法已经在前面分析过,不再重复撰述。
ReassignedPartitionsIsrChangeListener
ReassignedPartitionsIsrChangeListener 实现了 IZkDataListener 接口,用于监听指定 topic 分区的状态变更(关注 ISR 集合的变更),相关回调实现如下(省略部分日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def doHandleDataChange (dataPath: String , data: AnyRef ) { inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s" .format(dataPath, data)) val topicAndPartition = TopicAndPartition (topic, partition) try { controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { case Some (reassignedPartitionContext) => val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition) newLeaderAndIsrOpt match { case Some (leaderAndIsr) => val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet if (caughtUpReplicas == reassignedReplicas) { controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { } case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" .format(topicAndPartition, reassignedReplicas.mkString("," ))) } case None => } } catch { case e: Throwable => error("Error while handling partition reassignment" , e) } } }
上述回调主要的逻辑就是判断对应 topic 分区重新分配的 RAR 集合中的副本是否都已经进入 ISR 集合,如果是的话则触发执行 KafkaController#onPartitionReassignment
方法的后续操作,否则什么也不做,继续等待下一次回调。建议将该监听器与前面第 7 小节结合起来看,能够更好的梳理整个副本再分配的执行流程。
故障转移机制
一个 Kafka 集群包含多个 broker 节点,每个 broker 节点上都会运行一个 Kafka Controller 实例,但是这些实例中只有一个是 leader 角色,其余均为 follower 角色,这些 follower 会在 leader 节点宕机时竞选成为新的 leader,以保证集群的可用性。
Kafka 定义了 ZookeeperLeaderElector 类来处理故障转移,用于在 leader 节点宕机时从 follower 节点中选举新的 leader。ZookeeperLeaderElector 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ZookeeperLeaderElector (controllerContext: ControllerContext , // Controller 上下文对象 electionPath: String , // /controller onBecomingLeader: ( ) => Unit , onResigningAsLeader : () => Unit , brokerId: Int , time: Time ) extends LeaderElector with Logging { var leaderId: Int = -1 val leaderChangeListener = new LeaderChangeListener }
ZookeeperLeaderElector 的 ZookeeperLeaderElector#startup
方法会在 Kafka Controller 启动时被调用,以启动故障转移机制。该方法会在 /controller
节点上注册 LeaderChangeListener 监听器,并尝试竞选成为新的 leader。方法实现如下:
1 2 3 4 5 6 7 8 def startup { inLock(controllerContext.controllerLock) { controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } }
方法 ZookeeperLeaderElector#elect
已在前面多次提及过,用于执行 leader 选举,该方法主要在以下 3 种场景下被触发:
Kafka Controller 实例启动时。
ZK 节点 /controller
下的数据被清除时。
Broker 节点与 ZK 重新建立会话时。
下面来看一下 ZookeeperLeaderElector#elect
方法的实现,该方法基于 ZK 的临时节点机制竞选 leader 角色,并返回当前节点是不是新的 leader 角色:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def elect : Boolean = { val timestamp = time.milliseconds.toString val electString = Json .encode(Map ("version" -> 1 , "brokerid" -> brokerId, "timestamp" -> timestamp)) leaderId = this .getControllerID if (leaderId != -1 ) { debug("Broker %d has been elected as leader, so stopping the election process." .format(leaderId)) return amILeader } try { val zkCheckedEphemeral = new ZKCheckedEphemeral (electionPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils .isZkSecurityEnabled) zkCheckedEphemeral.create() info(brokerId + " successfully elected as leader" ) leaderId = brokerId onBecomingLeader() } catch { case _: ZkNodeExistsException => leaderId = getControllerID case e2: Throwable => error("Error while electing or becoming leader on broker %d" .format(brokerId), e2) resign() } amILeader }
基于 ZK 的临时节点机制实施 leader 选举是一个比较典型且成熟的方案,在很多分布式系统中均有应用。ZK 临时节点的特性就在于当 broker 节点与 ZK 断开连接时,之前创建的临时节点会被删除,如果对应的临时节点已经存在,则其它节点再次尝试创建时会抛出 ZkNodeExistsException 异常。如果当前 broker 节点成功竞选成为新的 leader,则会回调 ZookeeperLeaderElector#onBecomingLeader
方法,对应 KafkaController#onControllerFailover
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def onControllerFailover () { if (isRunning) { info("Broker %d starting become controller state transition" .format(config.brokerId)) this .readControllerEpochFromZookeeper() this .incrementControllerEpoch(zkUtils.zkClient) this .registerReassignedPartitionsListener() this .registerIsrChangeNotificationListener() this .registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() this .initializeControllerContext() this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() partitionStateMachine.startup() controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d" .format(config.brokerId, epoch)) this .maybeTriggerPartitionReassignment() this .maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler" ) autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule( "partition-rebalance-thread" , checkAndTriggerPartitionRebalance, 5 , config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit .SECONDS ) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover" ) }
上述方法实现了一个 broker 节点竞选成为新的 leader 之后所需要执行的一些初始化操作,其中一些步骤已经在前面分析过了,例如状态机的启动和初始化过程、分区副本再分配机制等,下面重点来看一下步骤 4 和 11。
初始化上下文信息
前面我们分析了管理 Kafka Controller 上下文的类 ControllerContext, 步骤 4 实现了当一个 broker 节点由 follower 角色切换成 leader 角色时对上下文执行初始化的操作。相关实现位于 KafkaController#initializeControllerContext
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private def initializeControllerContext () { controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet controllerContext.allTopics = zkUtils.getAllTopics.toSet controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq) controllerContext.partitionLeadershipInfo = new mutable.HashMap [TopicAndPartition , LeaderIsrAndControllerEpoch ] controllerContext.shuttingDownBrokerIds = mutable.Set .empty[Int ] this .updateLeaderAndIsrCache() this .startChannelManager() this .initializePreferredReplicaElection() this .initializePartitionReassignment() this .initializeTopicDeletion() }
分区再平衡机制
本小节介绍的分区再平衡机制与前面分析消费者和 GroupCoordinator 组件时提到的分区再分配机制不同,分区再分配的目的在于为一个 group 名下的消费者分配分区,而分区再平衡的目的在于将 topic 分区的 leader 副本尽量均匀分散在不同的 broker 节点上,以保证各个 broker 节点的负载均衡。
步骤 11 依据配置 auto.leader.rebalance.enable
决定是否启动 partition-rebalance-thread 定时任务,以对集群中的 topic 分区执行再平衡策略,从而保证各个 broker 节点的负载均衡。当一个 topic 被新建时,topic 名下的分区和分区对应的 leader 副本会尽可能均衡分散到集群中的 broker 节点上,但是随着服务的运行可能存在一些 boker 节点的失效,从而逐渐让各个 broker 节点上运行的分区 leader 副本数目失衡,造成某些 broker 节点负载较高,最终影响 Kafka 的性能。定时任务 partition-rebalance-thread 的作用在于主动发现负载较高的 broker 节点,并执行分区 leader 副本再平衡操作。
相关逻辑位于 KafkaController#checkAndTriggerPartitionRebalance
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private def checkAndTriggerPartitionRebalance (): Unit = { if (isActive) { trace("checking need to trigger partition rebalance" ) var preferredReplicasForTopicsByBrokers: Map [Int , Map [TopicAndPartition , Seq [Int ]]] = null inLock(controllerContext.controllerLock) { preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment .filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)) .groupBy { case (_, assignedReplicas) => assignedReplicas.head } } debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) => var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map [TopicAndPartition , Seq [Int ]] = null inLock(controllerContext.controllerLock) { topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) => controllerContext.partitionLeadershipInfo.contains(topicPartition) && controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker } debug("topics not in preferred replica " + topicsNotInPreferredReplica) val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker trace("leader imbalance ratio for broker %d is %f" .format(leaderBroker, imbalanceRatio)) } if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100 )) { topicsNotInPreferredReplica.keys.foreach { topicPartition => inLock(controllerContext.controllerLock) { if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.isEmpty && controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { onPreferredReplicaElection(Set (topicPartition), isTriggeredByAutoRebalance = true ) } } } } } } }
分区 leader 副本再平衡的执行流程可以概括如下:
获取所有分区及其副本集合,按照优先副本 ID 进行组织;
计算各个 broker 节点的不均衡比率;
当某个 broker 节点的不均衡比率超过阈值时,按条件执行优先副本选举。
一个 broker 节点不均衡比率在计算上等于 节点上不是以优先副本作为 leader 副本的分区数除以 broker 节点上运行的分区总数 ,当不均衡比率超过 leader.imbalance.per.broker.percentage
配置时,如果对应 topic 分区同时满足以下条件则触发优先副本选举,保证 broker 节点的负载均衡:
对应的 broker 节点是有效的。
没有分区正在执行副本再分配。
没有分区正在执行优先副本选举。
分区所属 topic 是有效且正常运行的。
Controlled Shutdown 机制
前面分析过 BrokerChangeListener 监听器,用于处理 boker 节点上下线的逻辑,这里的下线预示着对应的 broker 节点已经失效,而实际运维中还存在另外一种下线的场景,即由管理员主动触发下线(例如迁移机房、升级软件,修改 Kafka 配置等)。这一场景下对应的 broker 节点是正常运行的,如果我们需要下线这一类 broker 节点,Kafka 提供了更加温柔的方式,即 Controlled Shutdown 机制。相对于 broker 节点的宕机,Controlled Shutdown 关停 broker 节点的优势在于:
可以让日志数据全部落盘,避免重新上线后的日志恢复操作。
可以对 leader 副本位于待下线 broker 节点上的分区进行迁移,保证分区的可用性。
当管理员希望对目标 broker 节点执行 Controlled Shutdown 操作时,可以使用命令行工具向 Kafka Controller 发送 ControlledShutdownRequest 请求,相应的处理逻辑位于 KafkaController#shutdownBroker
方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 def shutdownBroker (id: Int ): Set [TopicAndPartition ] = { if (!isActive) { throw new ControllerMovedException ("Controller moved to another broker. Aborting controlled shutdown" ) } controllerContext.brokerShutdownLock synchronized { info("Shutting down broker " + id) inLock(controllerContext.controllerLock) { if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException ("Broker id %d does not exist." .format(id)) controllerContext.shuttingDownBrokerIds.add(id) } val allPartitionsAndReplicationFactorOnBroker: Set [(TopicAndPartition , Int )] = inLock(controllerContext.controllerLock) { controllerContext.partitionsOnBroker(id).map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)) } allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) => inLock(controllerContext.controllerLock) { controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => if (replicationFactor > 1 ) { if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { partitionStateMachine.handleStateChanges( Set (topicAndPartition), OnlinePartition , controlledShutdownPartitionLeaderSelector) } else { try { brokerRequestBatch.newBatch() brokerRequestBatch.addStopReplicaRequestForBrokers( Seq (id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false ) brokerRequestBatch.sendRequestsToBrokers(epoch) } catch { } replicaStateMachine.handleStateChanges( Set (PartitionAndReplica (topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica ) } } } } } def replicatedPartitionsBrokerLeads (): Iterable [TopicAndPartition ] = inLock(controllerContext.controllerLock) { trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString("," )) controllerContext.partitionLeadershipInfo.filter { case (topicAndPartition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.keys } replicatedPartitionsBrokerLeads().toSet } }
对于位于待关停的 broker 节点上的分区,如果启用了副本机制则需要判断分区 leader 副本是否位于待关停的 broker 节点上,如果是的话则需要使用 ControlledShutdownLeaderSelector 分区 leader 副本选择器为当前分区重新分配新的 leader 副本和 ISR 集合,并将结果通知给集群中相应的 broker 节点;如果分区 leader 副本不位于待关停 broker 节点上则直接向该节点发送 StopReplicaRequest 请求,关闭节点上的副本即可,这里可能涉及到分区 ISR 集合的变更,需要将变更的结果通知给集群中相应的 broker 节点。
总结
本文介绍了 Kafka Controller 组件的功能与实现,在一个 Kafka 集群中运行着多个 broker 节点,这些节点在启动时彼此是相互独立的,但是依托于 Kafka Controller 组件可以协调这些 broker 节点的运行,以集群的身份统一对外提供服务。Kafka Controller 提供了对集群中所有分区和副本的状态管理、集群上下文信息管理、副本再分配、分区再平衡、Controlled Shutdown 机制、故障转移机制,以及与 ZK 交互等功能,可以看做是 Kafka 集群的中央控制器。